This is an automated email from the ASF dual-hosted git repository.
ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 721593ebc6c0 [SPARK-54864][SQL][FOLLOWUP] Fix rCTEs in NormalizePlan
721593ebc6c0 is described below
commit 721593ebc6c018cbb8111174e2b31505b78c37f7
Author: pavle-martinovic_data <[email protected]>
AuthorDate: Mon Jan 12 17:10:43 2026 +0100
[SPARK-54864][SQL][FOLLOWUP] Fix rCTEs in NormalizePlan
### What changes were proposed in this pull request?
Followup of #53636 fixing bug with CTE Definition not being normalized
properly.
### Why are the changes needed?
Current implementation of plan normalization doesn't work.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Changed "Normalize rCTE" test so it fails without this change.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53753 from Pajaraja/pavle-martinovic_data/PlanNormalizationrCTEFix.
Authored-by: pavle-martinovic_data <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../spark/sql/catalyst/plans/NormalizePlan.scala | 2 +-
.../sql/catalyst/plans/NormalizePlanSuite.scala | 21 ++++++++++++++++++---
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
index b064cdba17cb..f7740cf558c1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
@@ -253,7 +253,7 @@ class CteIdNormalizer {
def normalizeDef(cteRelationDef: CTERelationDef): CTERelationDef = {
try {
- if (oldToNewIdMapping.containsKey(cteRelationDef)) {
+ if (oldToNewIdMapping.containsKey(cteRelationDef.id)) {
cteRelationDef.copy(id = oldToNewIdMapping.get(cteRelationDef.id))
} else {
oldToNewIdMapping.put(cteRelationDef.id, cteIdCounter)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
index d72b2d3582d2..cbf7ce794880 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala
@@ -247,7 +247,7 @@ class NormalizePlanSuite extends SparkFunSuite with
SQLConfHelper {
test("Normalize rCTEs") {
val col1 = $"col1".int
- val col2 = col1.newInstance()
+ val col2 = $"col2".int
val anchor = LocalRelation(col1)
// Create two full recursive CTEs - CTERelationDef with a UnionLoop and
UnionLoopRef with the
@@ -272,11 +272,26 @@ class NormalizePlanSuite extends SparkFunSuite with
SQLConfHelper {
id = 200L
)
+ val normalizedRecursiveCTE = CTERelationDef(
+ child = UnionLoop(
+ id = 0L,
+ anchor = LocalRelation(col1.withExprId(ExprId(0))),
+ recursion = UnionLoopRef(
+ loopId = 0L,
+ output = Seq(col2.withExprId(ExprId(0))),
+ accumulated = false
+ ),
+ outputAttrIds = Seq(ExprId(0), ExprId(0))
+ ),
+ id = 0L
+ )
+
// Before normalization, plans are different
assert(recursiveCTE1 != recursiveCTE2)
- // After normalization, they should be equal
- assert(NormalizePlan(recursiveCTE1) == NormalizePlan(recursiveCTE2))
+ // After normalization, they should be equal to the normalized plan
+ assert(NormalizePlan(recursiveCTE1) == normalizedRecursiveCTE)
+ assert(NormalizePlan(recursiveCTE2) == normalizedRecursiveCTE)
}
private def setTimezoneForAllExpression(plan: LogicalPlan): LogicalPlan = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]