This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8bd7789872b4 [SPARK-50722][SQL] Detect self-contained WITH nodes
8bd7789872b4 is described below
commit 8bd7789872b42c91fe9b3bbd73cc44fca865cf5c
Author: Peter Toth <[email protected]>
AuthorDate: Wed Jan 8 09:12:20 2025 +0800
[SPARK-50722][SQL] Detect self-contained WITH nodes
### What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/48284 to make
the CTE inline decision smarter but without introducing overly complex linage
tracking. This PR is an alternative fix to
https://github.com/apache/spark/pull/49352 with tracking CTE references a
different way.
First of all, this PR reverts https://github.com/apache/spark/pull/48284
and keeps tracking "contains" relation in `outgoingRefs`. An example for this
relation is "Case 2" in https://github.com/apache/spark/pull/49352 in which
`t2` contains `t3`. But while https://github.com/apache/spark/pull/48284
strores the reference from `t2` -> `t3` in `t2`'s `indirectOutgoingRefSources`,
this PR reverts to store the reference in `t2`'s `outgoingRefs` similar to
other usual references.
Obviously, the problem with this revert is that queries with duplicate CTE
definitions can fail, so let's revisit the problematic query from
https://github.com/apache/spark/pull/48284:
```
WithCTE
CTEDef r0
...
CTEDef r1
View v
WithCTE
CTEDef t1
OneRowRelation
CTEDef t2
CTERef t1
CTERef t2 // main query of the inner WithCTE
CTEDef r2
View v // exactly the same as the view v above
WithCTE
CTEDef t1
OneRowRelation
CTEDef t2
CTERef t1
CTERef t2
CTERef r2 // main query of the outer WithCTE
```
The source of the problem is view `v`, which duplicates a `WithCTE` node
and introduces conflicting CTE definitions.
In the above example, tracking both the `r1` -> `t2` and the `r2` -> `t2`
"contains" relations in `r1`'s and `r2`'s `outgoingRefs`s cause reference
counting problems, as those `t2`s are actually diffrent instances and so they
can have different inlining depending on how many times `r1` and `r2` are
referenced.
Fortunately, this duplicate issue can only happen when the conflicting
`WithCTE` nodes are self-contained (as those nodes can't contain any references
to any outer definitions like `r0`). The main advantage of self-contained
`WithCTE` nodes is that we need don't need to track `r1` -> `t2` and `r2` ->
`t2` references, as the inlining of `t2` (and `t1`) inside the `WithCTE` node
don't depend on references coming from outer (`r1`, `r2`) definition. (Please
note that what depends on `r1` [...]
So to recap:
- we do need to track references from "contains" relation when the inner
`WithCTE` is not self-contained
- we don't need to (and actually it causes issues if we do) when the inner
`WithCTE` is self-contained and the node appears multiple times in the query
- if the inner `WithCTE` is self-contained, but it appears only once in the
query, then the both ways are fine
This PR suggests the following:
- As during the traversal of `buildCTEMap()` we don't know in advance if a
`WithCTE` node is self-contained or not, let's:
- track the references from "contains" relations
- and also record the container of a definition.
E.g. when we encounter `t2` in `r1`, then record `t2` in `r1`'s
`outgoingRefs` and record `r1` as the container of `t2`.
- If we encounter a duplicate definition, then we know it is self-contained
so we:
- don't need to visit it again
- and we can also remove the reference from its container to the
definition.
E.g. when we encounter `t2` in `r2`, then clear `t2` from `r1`'s
`outgoingRefs` (we know that `r1` was `t2`'s container).
### Why are the changes needed?
Make sure we can inline CTE when we should.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added tests from https://github.com/apache/spark/pull/49352.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49379 from peter-toth/SPARK-50722-cte-fix.
Lead-authored-by: Peter Toth <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/optimizer/InlineCTE.scala | 74 +++++++++++--------
.../org/apache/spark/sql/CTEInlineSuite.scala | 85 +++++++++++++++++++++-
2 files changed, 127 insertions(+), 32 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
index b3384c4e2956..ad1a1a99b825 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
@@ -71,53 +71,64 @@ case class InlineCTE(
* @param plan The plan to collect the CTEs from
* @param cteMap A mutable map that accumulates the CTEs and their reference
information by CTE
* ids.
- * @param collectCTERefs A function to collect CTE references so that the
caller side can do some
- * bookkeeping work.
+ * @param outerCTEId While collecting the map we use this optional CTE id to
identify the
+ * current outer CTE.
*/
private def buildCTEMap(
plan: LogicalPlan,
cteMap: mutable.Map[Long, CTEReferenceInfo],
- collectCTERefs: CTERelationRef => Unit = _ => ()): Unit = {
+ outerCTEId: Option[Long] = None): Unit = {
plan match {
case WithCTE(child, cteDefs) =>
- cteDefs.foreach { cteDef =>
- cteMap(cteDef.id) = CTEReferenceInfo(
- cteDef = cteDef,
- refCount = 0,
- outgoingRefs = mutable.Map.empty.withDefaultValue(0),
- shouldInline = true
- )
- }
- cteDefs.foreach { cteDef =>
- buildCTEMap(cteDef, cteMap, ref => {
- // A CTE relation can references CTE relations defined before it
in the same `WithCTE`.
- // Here we update the out-going-ref-count for it, in case this CTE
relation is not
- // referenced at all and can be optimized out, and we need to
decrease the ref counts
- // for CTE relations that are referenced by it.
- if (cteDefs.exists(_.id == ref.cteId)) {
- cteMap(cteDef.id).increaseOutgoingRefCount(ref.cteId, 1)
- }
- // Similarly, a CTE relation can reference CTE relations defined
in the outer `WithCTE`.
- // Here we call the `collectCTERefs` function so that the outer
CTE can also update the
- // out-going-ref-count if needed.
- collectCTERefs(ref)
- })
+ val isDuplicated = cteDefs.forall(cteDef => cteMap.contains(cteDef.id))
+ if (isDuplicated) {
+ // If we have seen this `WithCTE` node then it must be
self-contained so we can clear
+ // the references from containers to the definitions, and we don't
need to process it
+ // again
+
+ cteDefs.foreach { cteDef =>
+ cteMap(cteDef.id).container.foreach(c => cteMap(c).outgoingRefs -=
cteDef.id)
+ }
+ } else {
+ cteDefs.foreach { cteDef =>
+ cteMap(cteDef.id) = CTEReferenceInfo(
+ cteDef = cteDef,
+ refCount = 0,
+ outgoingRefs = mutable.Map.empty.withDefaultValue(0),
+ shouldInline = true,
+ container = outerCTEId
+ )
+ }
+
+ cteDefs.foreach { cteDef =>
+ buildCTEMap(cteDef, cteMap, Some(cteDef.id))
+ }
+ buildCTEMap(child, cteMap, outerCTEId)
}
- buildCTEMap(child, cteMap, collectCTERefs)
case ref: CTERelationRef =>
cteMap(ref.cteId) = cteMap(ref.cteId).withRefCountIncreased(1)
- collectCTERefs(ref)
+
+ // The `outerCTEId` CTE definition can either reference `cteId`
definition if `cteId` is in
+ // the same or in an outer `WithCTE` node, or `outerCTEId` can contain
`cteId` definition if
+ // `cteId` is an inner `WithCTE` node inside `outerCTEId`.
+ // In both cases we can track the relations in `outgoingRefs` when we
see a definition the
+ // first time. But if we encounter a conflicting duplicated contains
relation later, then we
+ // will remove the references of the first contains relation.
+ outerCTEId.foreach { cteId =>
+ cteMap(cteId).increaseOutgoingRefCount(ref.cteId, 1)
+ }
+
case _ =>
if (plan.containsPattern(CTE)) {
plan.children.foreach { child =>
- buildCTEMap(child, cteMap, collectCTERefs)
+ buildCTEMap(child, cteMap, outerCTEId)
}
plan.expressions.foreach { expr =>
if (expr.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
expr.foreach {
- case e: SubqueryExpression => buildCTEMap(e.plan, cteMap,
collectCTERefs)
+ case e: SubqueryExpression => buildCTEMap(e.plan, cteMap,
outerCTEId)
case _ =>
}
}
@@ -225,12 +236,15 @@ case class InlineCTE(
* from other CTE relations and regular places.
* @param outgoingRefs A mutable map that tracks outgoing reference counts to
other CTE relations.
* @param shouldInline If true, this CTE relation should be inlined in the
places that reference it.
+ * @param container The container of a CTE definition is another CTE
definition in which the
+ * `WithCTE` node of the definition resides.
*/
case class CTEReferenceInfo(
cteDef: CTERelationDef,
refCount: Int,
outgoingRefs: mutable.Map[Long, Int],
- shouldInline: Boolean) {
+ shouldInline: Boolean,
+ container: Option[Long]) {
def withRefCountIncreased(count: Int): CTEReferenceInfo = {
copy(refCount = refCount + count)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index f22d90d9f35d..e8b9ffe28494 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql
-import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan,
Literal, Or}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, GreaterThan,
LessThan, Literal, Or, Rand}
+import org.apache.spark.sql.catalyst.optimizer.InlineCTE
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
@@ -715,7 +716,7 @@ abstract class CTEInlineSuiteBase
checkAnswer(df, Row(1))
}
- test("SPARK-49816: should only update out-going-ref-count for referenced
outer CTE relation") {
+ test("SPARK-49816: detect self-contained WithCTE nodes") {
withView("v") {
sql(
"""
@@ -735,6 +736,86 @@ abstract class CTEInlineSuiteBase
checkAnswer(df, Row(1))
}
}
+
+ test("SPARK-49816: complicated reference count") {
+ // Manually build the logical plan for
+ // WITH
+ // r1 AS (SELECT random()),
+ // r2 AS (
+ // WITH
+ // t1 AS (SELECT * FROM r1),
+ // t2 AS (SELECT * FROM r1)
+ // SELECT * FROM t2
+ // )
+ // SELECT * FROM r2
+ // r1 should be inlined as it's only referenced once: main query -> r2 ->
t2 -> r1
+ val r1 = CTERelationDef(Project(Seq(Alias(Rand(Literal(0)), "r")()),
OneRowRelation()))
+ val r1Ref = CTERelationRef(r1.id, r1.resolved, r1.output, r1.isStreaming)
+ val t1 = CTERelationDef(Project(r1.output, r1Ref))
+ val t2 = CTERelationDef(Project(r1.output, r1Ref))
+ val t2Ref = CTERelationRef(t2.id, t2.resolved, t2.output, t2.isStreaming)
+ val r2 = CTERelationDef(WithCTE(Project(t2.output, t2Ref), Seq(t1, t2)))
+ val r2Ref = CTERelationRef(r2.id, r2.resolved, r2.output, r2.isStreaming)
+ val query = WithCTE(Project(r2.output, r2Ref), Seq(r1, r2))
+ val inlined = InlineCTE().apply(query)
+ assert(!inlined.exists(_.isInstanceOf[WithCTE]))
+ }
+
+ test("SPARK-49816: complicated reference count 2") {
+ // Manually build the logical plan for
+ // WITH
+ // r1 AS (SELECT random()),
+ // r2 AS (
+ // WITH
+ // t1 AS (SELECT * FROM r1),
+ // t2 AS (SELECT * FROM t1)
+ // SELECT * FROM t2
+ // )
+ // SELECT * FROM r1
+ // This is similar to the previous test case, but t2 reference t1 instead
of r1, and the main
+ // query references r1. r1 should be inlined as r2 is not referenced at
all.
+ val r1 = CTERelationDef(Project(Seq(Alias(Rand(Literal(0)), "r")()),
OneRowRelation()))
+ val r1Ref = CTERelationRef(r1.id, r1.resolved, r1.output, r1.isStreaming)
+ val t1 = CTERelationDef(Project(r1.output, r1Ref))
+ val t1Ref = CTERelationRef(t1.id, t1.resolved, t1.output, t1.isStreaming)
+ val t2 = CTERelationDef(Project(t1.output, t1Ref))
+ val t2Ref = CTERelationRef(t2.id, t2.resolved, t2.output, t2.isStreaming)
+ val r2 = CTERelationDef(WithCTE(Project(t2.output, t2Ref), Seq(t1, t2)))
+ val query = WithCTE(Project(r1.output, r1Ref), Seq(r1, r2))
+ val inlined = InlineCTE().apply(query)
+ assert(!inlined.exists(_.isInstanceOf[WithCTE]))
+ }
+
+ test("SPARK-49816: complicated reference count 3") {
+ // Manually build the logical plan for
+ // WITH
+ // r1 AS (
+ // WITH
+ // t1 AS (SELECT random()),
+ // t2 AS (SELECT * FROM t1)
+ // SELECT * FROM t2
+ // ),
+ // r2 AS (
+ // WITH
+ // t1 AS (SELECT random()),
+ // t2 AS (SELECT * FROM r1)
+ // SELECT * FROM t2
+ // )
+ // SELECT * FROM r1 UNION ALL SELECT * FROM r2
+ // The inner WITH in r1 and r2 should become `SELECT random()` and r1/r2
should be inlined.
+ val t1 = CTERelationDef(Project(Seq(Alias(Rand(Literal(0)), "r")()),
OneRowRelation()))
+ val t1Ref = CTERelationRef(t1.id, t1.resolved, t1.output, t1.isStreaming)
+ val t2 = CTERelationDef(Project(t1.output, t1Ref))
+ val t2Ref = CTERelationRef(t2.id, t2.resolved, t2.output, t2.isStreaming)
+ val cte = WithCTE(Project(t2.output, t2Ref), Seq(t1, t2))
+ val r1 = CTERelationDef(cte)
+ val r1Ref = CTERelationRef(r1.id, r1.resolved, r1.output, r1.isStreaming)
+ val r2 = CTERelationDef(cte)
+ val r2Ref = CTERelationRef(r2.id, r2.resolved, r2.output, r2.isStreaming)
+ val query = WithCTE(Union(r1Ref, r2Ref), Seq(r1, r2))
+ val inlined = InlineCTE().apply(query)
+ assert(!inlined.exists(_.isInstanceOf[WithCTE]))
+ }
}
class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with
DisableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]