This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 488f3923c72 [SPARK-38404][SQL] Improve CTE resolution when a nested
CTE references an outer CTE
488f3923c72 is described below
commit 488f3923c72f636c6ebb01c245609c51c3b68b67
Author: Peter Toth <[email protected]>
AuthorDate: Thu Apr 21 00:37:28 2022 +0800
[SPARK-38404][SQL] Improve CTE resolution when a nested CTE references an
outer CTE
### What changes were proposed in this pull request?
Please note that the bug in the
[SPARK-38404](https://issues.apache.org/jira/browse/SPARK-38404) is fixed
already with https://github.com/apache/spark/pull/34929.
This PR is a minor improvement to the current implementation by collecting
already resolved outer CTEs to avoid re-substituting already collected CTE
definitions.
### Why are the changes needed?
Small improvement + additional tests.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added new test case.
Closes #36146 from peter-toth/SPARK-38404-nested-cte-references-outer-cte.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/CTESubstitution.scala | 71 ++++++++++------------
.../test/resources/sql-tests/inputs/cte-nested.sql | 13 +++-
.../resources/sql-tests/results/cte-legacy.sql.out | 19 +++++-
.../resources/sql-tests/results/cte-nested.sql.out | 18 +++++-
.../sql-tests/results/cte-nonlegacy.sql.out | 18 +++++-
5 files changed, 96 insertions(+), 43 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 976a5d385d8..62ebfa83431 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
-import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef,
CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias,
UnresolvedWith, WithCTE}
@@ -55,27 +55,27 @@ object CTESubstitution extends Rule[LogicalPlan] {
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
case _ => false
}
- val cteDefs = mutable.ArrayBuffer.empty[CTERelationDef]
+ val cteDefs = ArrayBuffer.empty[CTERelationDef]
val (substituted, lastSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match
{
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
- traverseAndSubstituteCTE(plan, isCommand, cteDefs)
+ traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
case LegacyBehaviorPolicy.LEGACY =>
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
case LegacyBehaviorPolicy.CORRECTED =>
- traverseAndSubstituteCTE(plan, isCommand, cteDefs)
+ traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
}
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq lastSubstituted.get) {
- WithCTE(substituted, cteDefs.sortBy(_.id).toSeq)
+ WithCTE(substituted, cteDefs.toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
case p if p eq lastSubstituted.get =>
done = true
- WithCTE(p, cteDefs.sortBy(_.id).toSeq)
+ WithCTE(p, cteDefs.toSeq)
}
}
}
@@ -98,7 +98,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
val resolver = conf.resolver
plan match {
case UnresolvedWith(child, relations) =>
- val newNames = mutable.ArrayBuffer.empty[String]
+ val newNames = ArrayBuffer.empty[String]
newNames ++= outerCTERelationNames
relations.foreach {
case (name, relation) =>
@@ -121,11 +121,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
private def legacyTraverseAndSubstituteCTE(
plan: LogicalPlan,
- cteDefs: mutable.ArrayBuffer[CTERelationDef]): LogicalPlan = {
+ cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsUp {
case UnresolvedWith(child, relations) =>
val resolvedCTERelations =
- resolveCTERelations(relations, isLegacy = true, isCommand = false,
cteDefs)
+ resolveCTERelations(relations, isLegacy = true, isCommand = false,
Seq.empty, cteDefs)
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
}
}
@@ -170,21 +170,23 @@ object CTESubstitution extends Rule[LogicalPlan] {
* SELECT * FROM t
* )
* @param plan the plan to be traversed
- * @return the plan where CTE substitution is applied
+ * @param isCommand if this is a command
+ * @param outerCTEDefs already resolved outer CTE definitions with names
+ * @param cteDefs all accumulated CTE definitions
+ * @return the plan where CTE substitution is applied and optionally the
last substituted `With`
+ * where CTE definitions will be gathered to
*/
private def traverseAndSubstituteCTE(
plan: LogicalPlan,
isCommand: Boolean,
- cteDefs: mutable.ArrayBuffer[CTERelationDef]): (LogicalPlan,
Option[LogicalPlan]) = {
+ outerCTEDefs: Seq[(String, CTERelationDef)],
+ cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan,
Option[LogicalPlan]) = {
var lastSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
val resolvedCTERelations =
- resolveCTERelations(relations, isLegacy = false, isCommand, cteDefs)
- if (!isCommand) {
- cteDefs ++= resolvedCTERelations.map(_._2)
- }
+ resolveCTERelations(relations, isLegacy = false, isCommand,
outerCTEDefs, cteDefs)
lastSubstituted = Some(substituteCTE(child, isCommand,
resolvedCTERelations))
lastSubstituted.get
@@ -200,10 +202,14 @@ object CTESubstitution extends Rule[LogicalPlan] {
relations: Seq[(String, SubqueryAlias)],
isLegacy: Boolean,
isCommand: Boolean,
- cteDefs: mutable.ArrayBuffer[CTERelationDef]): Seq[(String,
CTERelationDef)] = {
- val resolvedCTERelations = new mutable.ArrayBuffer[(String,
CTERelationDef)](relations.size)
+ outerCTEDefs: Seq[(String, CTERelationDef)],
+ cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
+ var resolvedCTERelations = if (isLegacy || isCommand) {
+ Seq.empty
+ } else {
+ outerCTEDefs
+ }
for ((name, relation) <- relations) {
- val lastCTEDefCount = cteDefs.length
val innerCTEResolved = if (isLegacy) {
// In legacy mode, outer CTE relations take precedence. Here we don't
resolve the inner
// `With` nodes, later we will substitute `UnresolvedRelation`s with
outer CTE relations.
@@ -221,31 +227,18 @@ object CTESubstitution extends Rule[LogicalPlan] {
// WITH t3 AS (SELECT * FROM t1)
// )
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
- traverseAndSubstituteCTE(relation, isCommand, cteDefs)._1
- }
-
- if (cteDefs.length > lastCTEDefCount) {
- // We have added more CTE relations to the `cteDefs` from the inner
CTE, and these relations
- // should also be substituted with `resolvedCTERelations` as inner CTE
relation can refer to
- // outer CTE relation. For example:
- // WITH t1 AS (SELECT 1)
- // t2 AS (
- // WITH t3 AS (SELECT * FROM t1)
- // )
- for (i <- lastCTEDefCount until cteDefs.length) {
- val substituted =
- substituteCTE(cteDefs(i).child, isLegacy || isCommand,
resolvedCTERelations.toSeq)
- cteDefs(i) = cteDefs(i).copy(child = substituted)
- }
+ traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations,
cteDefs)._1
}
-
// CTE definition can reference a previous one
- val substituted =
- substituteCTE(innerCTEResolved, isLegacy || isCommand,
resolvedCTERelations.toSeq)
+ val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand,
resolvedCTERelations)
val cteRelation = CTERelationDef(substituted)
- resolvedCTERelations += (name -> cteRelation)
+ if (!(isLegacy || isCommand)) {
+ cteDefs += cteRelation
+ }
+ // Prepending new CTEs makes sure that those have higher priority over
outer ones.
+ resolvedCTERelations +:= (name -> cteRelation)
}
- resolvedCTERelations.toSeq
+ resolvedCTERelations
}
private def substituteCTE(
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
index 3b64b5daa82..b5d7fa5687b 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
@@ -135,4 +135,15 @@ WITH abc AS (SELECT 1)
SELECT (
WITH aBc AS (SELECT 2)
SELECT * FROM aBC
-);
\ No newline at end of file
+);
+
+-- SPARK-38404: CTE in CTE definition references outer
+WITH
+ t1 AS (SELECT 1),
+ t2 AS (
+ WITH t3 AS (
+ SELECT * FROM t1
+ )
+ SELECT * FROM t3
+ )
+SELECT * FROM t2;
\ No newline at end of file
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
index 4d0e5ea829d..db7d420a745 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 16
+-- Number of queries: 17
-- !query
@@ -219,3 +219,20 @@ SELECT (
struct<scalarsubquery():int>
-- !query output
1
+
+
+-- !query
+WITH
+ t1 AS (SELECT 1),
+ t2 AS (
+ WITH t3 AS (
+ SELECT * FROM t1
+ )
+ SELECT * FROM t3
+ )
+SELECT * FROM t2
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: t1; line 5 pos 20
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
index a8db4599daf..f714a11d1df 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 16
+-- Number of queries: 17
-- !query
@@ -227,3 +227,19 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Name aBc is ambiguous in nested CTE. Please set
spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner
CTE takes precedence. If set it to LEGACY, outer CTE definitions will take
precedence. See more details in SPARK-28228.
+
+
+-- !query
+WITH
+ t1 AS (SELECT 1),
+ t2 AS (
+ WITH t3 AS (
+ SELECT * FROM t1
+ )
+ SELECT * FROM t3
+ )
+SELECT * FROM t2
+-- !query schema
+struct<1:int>
+-- !query output
+1
diff --git
a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
index 74394ee3ffc..2ab13003d04 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 16
+-- Number of queries: 17
-- !query
@@ -219,3 +219,19 @@ SELECT (
struct<scalarsubquery():int>
-- !query output
2
+
+
+-- !query
+WITH
+ t1 AS (SELECT 1),
+ t2 AS (
+ WITH t3 AS (
+ SELECT * FROM t1
+ )
+ SELECT * FROM t3
+ )
+SELECT * FROM t2
+-- !query schema
+struct<1:int>
+-- !query output
+1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]