This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 6a91b1d063a8 [SPARK-56921][SQL] Fix CTE ID normalization for nested
CTEs
6a91b1d063a8 is described below
commit 6a91b1d063a8e868830307bb0f0c4e8c076831c4
Author: Puneet Dixit <[email protected]>
AuthorDate: Tue May 26 11:06:01 2026 +0800
[SPARK-56921][SQL] Fix CTE ID normalization for nested CTEs
## What changes were proposed in this pull request?
This updates `NormalizeCTEIds` so normalization for the current `WithCTE`
scope does not cross into nested `WithCTE` nodes. Nested `WithCTE`s are
normalized separately by the normal top-down traversal, including when they are
reached through subquery expressions.
A regression test covers the reported temp-view + nested CTE + union
pattern.
## Why are the changes needed?
The previous traversal could double-remap refs because `cteIdToNewId` is
shared across the rule run. When sibling `WithCTE`s sit under a `Union`, the
left side can map the inner CTE id to a new id, the right side can rewrite the
same nested ref through that map, and the outer traversal can then rewrite that
new id again if it is also a key in the map. The second rewrite can point the
ref at the wrong outer CTE definition, which later lets `InlineCTE` substitute
the wrong body.
## Does this PR introduce _any_ user-facing change?
No.
## How was this patch tested?
- `git diff --check origin/master HEAD --
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala`
- `JAVA_HOME=/opt/homebrew/opt/openjdk17 build/sbt 'sql/testOnly
*CTEInlineSuiteAEOff *CTEInlineSuiteAEOn -- -z SPARK-56921'`
## Was this patch authored or co-authored using generative AI tooling?
Generated-by: OpenAI GPT-5
Closes #55985 from puneetdixit200/fix-spark-56921-normalize-nested-cte.
Authored-by: Puneet Dixit
<[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 037e9e6faa43b6fc62af271e5ed1cd12ee4676f2)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/normalizer/NormalizeCTEIds.scala | 36 ++++++++----
.../org/apache/spark/sql/CTEInlineSuite.scala | 67 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 11 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
index 6c0bca0e1104..660f11d368ab 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect,
CTERelationRef, LogicalPlan, UnionLoop, UnionLoopRef, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -52,16 +53,29 @@ object NormalizeCTEIds extends Rule[LogicalPlan] {
private def canonicalizeCTE(
plan: LogicalPlan,
- defIdToNewId: mutable.Map[Long, Long]): LogicalPlan = {
- plan.transformDownWithSubqueries {
- // For nested WithCTE, if defIndex didn't contain the cteId,
- // means it's not current WithCTE's ref.
- case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) =>
- ref.copy(cteId = defIdToNewId(ref.cteId))
- case unionLoop: UnionLoop if defIdToNewId.contains(unionLoop.id) =>
- unionLoop.copy(id = defIdToNewId(unionLoop.id))
- case unionLoopRef: UnionLoopRef if
defIdToNewId.contains(unionLoopRef.loopId) =>
- unionLoopRef.copy(loopId = defIdToNewId(unionLoopRef.loopId))
- }
+ defIdToNewId: mutable.Map[Long, Long]): LogicalPlan = plan match {
+ // Stop at nested WithCTEs because applyInternal canonicalizes each
WithCTE scope
+ // independently. Descending here would re-apply the shared cteIdToNewId
map to
+ // inner-scope refs and, under sibling WithCTEs, move them to the wrong CTE
+ // definition (SPARK-56921).
+ case _: WithCTE => plan
+ case other =>
+ val normalizedPlan = other match {
+ case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) =>
+ ref.copy(cteId = defIdToNewId(ref.cteId))
+ case unionLoop: UnionLoop if defIdToNewId.contains(unionLoop.id) =>
+ unionLoop.copy(id = defIdToNewId(unionLoop.id))
+ case unionLoopRef: UnionLoopRef if
defIdToNewId.contains(unionLoopRef.loopId) =>
+ unionLoopRef.copy(loopId = defIdToNewId(unionLoopRef.loopId))
+ case _ =>
+ other
+ }
+
+ normalizedPlan
+ .withNewChildren(normalizedPlan.children.map(canonicalizeCTE(_,
defIdToNewId)))
+ .transformExpressionsDown {
+ case subqueryExpression: SubqueryExpression =>
+
subqueryExpression.withNewPlan(canonicalizeCTE(subqueryExpression.plan,
defIdToNewId))
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index 7562d5669cc2..3833b7f2509d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -261,6 +261,73 @@ abstract class CTEInlineSuiteBase
}
}
+ test("SPARK-56921: plan normalization handles nested CTEs under union") {
+ withTempView("input", "common") {
+ Seq((1, 1, 10), (1, 2, 20), (2, 1, 30))
+ .toDF("a", "b", "value")
+ .createOrReplaceTempView("input")
+
+ sql(
+ s"""with cte_common as (
+ | select a, b, sum(value) as value
+ | from input
+ | group by a, b
+ |)
+ |select * from cte_common
+ """.stripMargin).createOrReplaceTempView("common")
+
+ val left = sql(
+ s"""with cte_a as (
+ | select a, sum(value) as value
+ | from common
+ | group by a
+ |)
+ |select a as id, value from cte_a
+ """.stripMargin)
+
+ val right = sql(
+ s"""with cte_b as (
+ | select b, sum(value) as value
+ | from common
+ | group by b
+ |)
+ |select b as id, value from cte_b
+ """.stripMargin)
+
+ checkAnswer(
+ left.union(right),
+ Row(1, 30) :: Row(2, 30) :: Row(1, 40) :: Row(2, 20) :: Nil)
+ }
+ }
+
+ test("SPARK-56921: plan normalization preserves recursive CTE loop refs") {
+ val df = sql(
+ s"""with recursive t(n) as (
+ | select 1
+ | union all
+ | select n + 1 from t where n < 3
+ |)
+ |select * from t
+ """.stripMargin)
+
+ val normalized = df.queryExecution.normalized
+ val unionLoops = normalized.collect { case unionLoop: UnionLoop =>
unionLoop }
+
+ assert(unionLoops.nonEmpty, "Recursive CTE should normalize with a
UnionLoop.")
+ unionLoops.foreach { unionLoop =>
+ val unionLoopRefs = unionLoop.recursion.collect {
+ case unionLoopRef: UnionLoopRef => unionLoopRef
+ }
+
+ assert(unionLoopRefs.nonEmpty, "Recursive CTE should normalize with a
UnionLoopRef.")
+ assert(
+ unionLoopRefs.forall(_.loopId == unionLoop.id),
+ "UnionLoopRef loop IDs should match the normalized UnionLoop ID.")
+ }
+
+ checkAnswer(df, Row(1) :: Row(2) :: Row(3) :: Nil)
+ }
+
test("SPARK-36447: invalid nested CTEs") {
withTempView("t") {
Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]