This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e0c9ddcec9ae [SPARK-50743][SQL] Normalize CTERelationDef and
CTERelationRef IDs
e0c9ddcec9ae is described below
commit e0c9ddcec9aee5c9ca12ef5ee731d977ff2e4f09
Author: Vladimir Golubev <[email protected]>
AuthorDate: Wed Jan 8 12:58:53 2025 +0800
[SPARK-50743][SQL] Normalize CTERelationDef and CTERelationRef IDs
### What changes were proposed in this pull request?
Normalize CTERelationDef and CTERelationRef IDs.
### Why are the changes needed?
This is necessary so that the output of single-pass Analyzer containing
CTEs can be compared to the fixed-point one.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49377 from vladimirg-db/vladimirg-db/normalize-cte-def-ref-ids.
Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/plans/NormalizePlan.scala | 33 ++++++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
index 38cf2730e9ac..d7ba596cf399 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.plans
+import java.util.HashMap
+
import org.apache.spark.sql.catalyst.analysis.GetViewColumnByNameAndOrdinal
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -68,8 +70,13 @@ object NormalizePlan extends PredicateHelper {
* etc., will all now be equivalent.
* - Sample the seed will replaced by 0L.
* - Join conditions will be resorted by hashCode.
+ * - CTERelationDef ids will be rewritten using a monitonically increasing
counter from 0.
+ * - CTERelationRef ids will be remapped based on the new CTERelationDef
IDs. This is possible,
+ * because WithCTE returns cteDefs as first children, and the defs will be
traversed before the
+ * refs.
*/
def normalizePlan(plan: LogicalPlan): LogicalPlan = {
+ val cteIdNormalizer = new CteIdNormalizer
plan transform {
case Filter(condition: Expression, child: LogicalPlan) =>
Filter(
@@ -113,6 +120,10 @@ object NormalizePlan extends PredicateHelper {
localRelation.copy(data = localRelation.data.map { row =>
unsafeProjection(row)
})
+ case cteRelationDef: CTERelationDef =>
+ cteIdNormalizer.normalizeDef(cteRelationDef)
+ case cteRelationRef: CTERelationRef =>
+ cteIdNormalizer.normalizeRef(cteRelationRef)
}
}
@@ -143,3 +154,25 @@ object NormalizePlan extends PredicateHelper {
case _ => condition // Don't reorder.
}
}
+
+class CteIdNormalizer {
+ private var cteIdCounter: Long = 0
+ private val oldToNewIdMapping = new HashMap[Long, Long]
+
+ def normalizeDef(cteRelationDef: CTERelationDef): CTERelationDef = {
+ try {
+ oldToNewIdMapping.put(cteRelationDef.id, cteIdCounter)
+ cteRelationDef.copy(id = cteIdCounter)
+ } finally {
+ cteIdCounter += 1
+ }
+ }
+
+ def normalizeRef(cteRelationRef: CTERelationRef): CTERelationRef = {
+ if (oldToNewIdMapping.containsKey(cteRelationRef.cteId)) {
+ cteRelationRef.copy(cteId = oldToNewIdMapping.get(cteRelationRef.cteId))
+ } else {
+ cteRelationRef
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]