This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bd7789872b4 [SPARK-50722][SQL] Detect self-contained WITH nodes
8bd7789872b4 is described below

commit 8bd7789872b42c91fe9b3bbd73cc44fca865cf5c
Author: Peter Toth <[email protected]>
AuthorDate: Wed Jan 8 09:12:20 2025 +0800

    [SPARK-50722][SQL] Detect self-contained WITH nodes
    
    ### What changes were proposed in this pull request?
    This is a follow-up of https://github.com/apache/spark/pull/48284 to make 
the CTE inline decision smarter but without introducing overly complex linage 
tracking. This PR is an alternative fix to 
https://github.com/apache/spark/pull/49352 with tracking CTE references a 
different way.
    
    First of all, this PR reverts https://github.com/apache/spark/pull/48284 
and keeps tracking "contains" relation in `outgoingRefs`. An example for this 
relation is "Case 2" in https://github.com/apache/spark/pull/49352 in which 
`t2` contains `t3`. But while https://github.com/apache/spark/pull/48284 
strores the reference from `t2` -> `t3` in `t2`'s `indirectOutgoingRefSources`, 
this PR reverts to store the reference in `t2`'s `outgoingRefs` similar to 
other usual references.
    
    Obviously, the problem with this revert is that queries with duplicate CTE 
definitions can fail, so let's revisit the problematic query from 
https://github.com/apache/spark/pull/48284:
    ```
    WithCTE
      CTEDef r0
        ...
      CTEDef r1
        View v
          WithCTE
            CTEDef t1
              OneRowRelation
            CTEDef t2
              CTERef t1
            CTERef t2    // main query of the inner WithCTE
      CTEDef r2
        View v   // exactly the same as the view v above
          WithCTE
            CTEDef t1
              OneRowRelation
            CTEDef t2
              CTERef t1
            CTERef t2
      CTERef r2    // main query of the outer WithCTE
    ```
    The source of the problem is view `v`, which duplicates a `WithCTE` node 
and introduces conflicting CTE definitions.
    In the above example, tracking both the `r1` -> `t2` and the `r2` -> `t2` 
"contains" relations in `r1`'s and `r2`'s `outgoingRefs`s cause reference 
counting problems, as those `t2`s are actually diffrent instances and so they 
can have different inlining depending on how many times `r1` and `r2` are 
referenced.
    
    Fortunately, this duplicate issue can only happen when the conflicting 
`WithCTE` nodes are self-contained (as those nodes can't contain any references 
to any outer definitions like `r0`). The main advantage of self-contained 
`WithCTE` nodes is that we need don't need to track `r1` -> `t2` and `r2` -> 
`t2` references, as the inlining of `t2` (and `t1`) inside the `WithCTE` node 
don't depend on references coming from outer (`r1`, `r2`) definition. (Please 
note that what depends on `r1`  [...]
    
    So to recap:
    - we do need to track references from "contains" relation when the inner 
`WithCTE` is not self-contained
    - we don't need to (and actually it causes issues if we do) when the inner 
`WithCTE` is self-contained and the node appears multiple times in the query
    - if the inner `WithCTE` is self-contained, but it appears only once in the 
query, then the both ways are fine
    
    This PR suggests the following:
    - As during the traversal of `buildCTEMap()` we don't know in advance if a 
`WithCTE` node is self-contained or not, let's:
      - track the references from "contains" relations
      - and also record the container of a definition.
    
      E.g. when we encounter `t2` in `r1`, then record `t2` in `r1`'s 
`outgoingRefs` and record `r1` as the container of `t2`.
    
    - If we encounter a duplicate definition, then we know it is self-contained 
so we:
      - don't need to visit it again
      - and we can also remove the reference from its container to the 
definition.
    
      E.g. when we encounter `t2` in `r2`, then clear `t2` from `r1`'s 
`outgoingRefs` (we know that `r1` was `t2`'s container).
    
    ### Why are the changes needed?
    Make sure we can inline CTE when we should.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added tests from https://github.com/apache/spark/pull/49352.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #49379 from peter-toth/SPARK-50722-cte-fix.
    
    Lead-authored-by: Peter Toth <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/optimizer/InlineCTE.scala   | 74 +++++++++++--------
 .../org/apache/spark/sql/CTEInlineSuite.scala      | 85 +++++++++++++++++++++-
 2 files changed, 127 insertions(+), 32 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
index b3384c4e2956..ad1a1a99b825 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
@@ -71,53 +71,64 @@ case class InlineCTE(
    * @param plan The plan to collect the CTEs from
    * @param cteMap A mutable map that accumulates the CTEs and their reference 
information by CTE
    *               ids.
-   * @param collectCTERefs A function to collect CTE references so that the 
caller side can do some
-   *                       bookkeeping work.
+   * @param outerCTEId While collecting the map we use this optional CTE id to 
identify the
+   *                   current outer CTE.
    */
   private def buildCTEMap(
       plan: LogicalPlan,
       cteMap: mutable.Map[Long, CTEReferenceInfo],
-      collectCTERefs: CTERelationRef => Unit = _ => ()): Unit = {
+      outerCTEId: Option[Long] = None): Unit = {
     plan match {
       case WithCTE(child, cteDefs) =>
-        cteDefs.foreach { cteDef =>
-          cteMap(cteDef.id) = CTEReferenceInfo(
-            cteDef = cteDef,
-            refCount = 0,
-            outgoingRefs = mutable.Map.empty.withDefaultValue(0),
-            shouldInline = true
-          )
-        }
-        cteDefs.foreach { cteDef =>
-          buildCTEMap(cteDef, cteMap, ref => {
-            // A CTE relation can references CTE relations defined before it 
in the same `WithCTE`.
-            // Here we update the out-going-ref-count for it, in case this CTE 
relation is not
-            // referenced at all and can be optimized out, and we need to 
decrease the ref counts
-            // for CTE relations that are referenced by it.
-            if (cteDefs.exists(_.id == ref.cteId)) {
-              cteMap(cteDef.id).increaseOutgoingRefCount(ref.cteId, 1)
-            }
-            // Similarly, a CTE relation can reference CTE relations defined 
in the outer `WithCTE`.
-            // Here we call the `collectCTERefs` function so that the outer 
CTE can also update the
-            // out-going-ref-count if needed.
-            collectCTERefs(ref)
-          })
+        val isDuplicated = cteDefs.forall(cteDef => cteMap.contains(cteDef.id))
+        if (isDuplicated) {
+          // If we have seen this `WithCTE` node then it must be 
self-contained so we can clear
+          // the references from containers to the definitions, and we don't 
need to process it
+          // again
+
+          cteDefs.foreach { cteDef =>
+            cteMap(cteDef.id).container.foreach(c => cteMap(c).outgoingRefs -= 
cteDef.id)
+          }
+        } else {
+          cteDefs.foreach { cteDef =>
+            cteMap(cteDef.id) = CTEReferenceInfo(
+              cteDef = cteDef,
+              refCount = 0,
+              outgoingRefs = mutable.Map.empty.withDefaultValue(0),
+              shouldInline = true,
+              container = outerCTEId
+            )
+          }
+
+          cteDefs.foreach { cteDef =>
+            buildCTEMap(cteDef, cteMap, Some(cteDef.id))
+          }
+          buildCTEMap(child, cteMap, outerCTEId)
         }
-        buildCTEMap(child, cteMap, collectCTERefs)
 
       case ref: CTERelationRef =>
         cteMap(ref.cteId) = cteMap(ref.cteId).withRefCountIncreased(1)
-        collectCTERefs(ref)
+
+        // The `outerCTEId` CTE definition can either reference `cteId` 
definition if `cteId` is in
+        // the same or in an outer `WithCTE` node, or `outerCTEId` can contain 
`cteId` definition if
+        // `cteId` is an inner `WithCTE` node inside `outerCTEId`.
+        // In both cases we can track the relations in `outgoingRefs` when we 
see a definition the
+        // first time. But if we encounter a conflicting duplicated contains 
relation later, then we
+        // will remove the references of the first contains relation.
+        outerCTEId.foreach { cteId =>
+          cteMap(cteId).increaseOutgoingRefCount(ref.cteId, 1)
+        }
+
       case _ =>
         if (plan.containsPattern(CTE)) {
           plan.children.foreach { child =>
-            buildCTEMap(child, cteMap, collectCTERefs)
+            buildCTEMap(child, cteMap, outerCTEId)
           }
 
           plan.expressions.foreach { expr =>
             if (expr.containsAllPatterns(PLAN_EXPRESSION, CTE)) {
               expr.foreach {
-                case e: SubqueryExpression => buildCTEMap(e.plan, cteMap, 
collectCTERefs)
+                case e: SubqueryExpression => buildCTEMap(e.plan, cteMap, 
outerCTEId)
                 case _ =>
               }
             }
@@ -225,12 +236,15 @@ case class InlineCTE(
  *                 from other CTE relations and regular places.
  * @param outgoingRefs A mutable map that tracks outgoing reference counts to 
other CTE relations.
  * @param shouldInline If true, this CTE relation should be inlined in the 
places that reference it.
+ * @param container The container of a CTE definition is another CTE 
definition in which the
+ *                  `WithCTE` node of the definition resides.
  */
 case class CTEReferenceInfo(
     cteDef: CTERelationDef,
     refCount: Int,
     outgoingRefs: mutable.Map[Long, Int],
-    shouldInline: Boolean) {
+    shouldInline: Boolean,
+    container: Option[Long]) {
 
   def withRefCountIncreased(count: Int): CTEReferenceInfo = {
     copy(refCount = refCount + count)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index f22d90d9f35d..e8b9ffe28494 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan, 
Literal, Or}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, GreaterThan, 
LessThan, Literal, Or, Rand}
+import org.apache.spark.sql.catalyst.optimizer.InlineCTE
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.adaptive._
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
@@ -715,7 +716,7 @@ abstract class CTEInlineSuiteBase
     checkAnswer(df, Row(1))
   }
 
-  test("SPARK-49816: should only update out-going-ref-count for referenced 
outer CTE relation") {
+  test("SPARK-49816: detect self-contained WithCTE nodes") {
     withView("v") {
       sql(
         """
@@ -735,6 +736,86 @@ abstract class CTEInlineSuiteBase
       checkAnswer(df, Row(1))
     }
   }
+
+  test("SPARK-49816: complicated reference count") {
+    // Manually build the logical plan for
+    // WITH
+    //  r1 AS (SELECT random()),
+    //  r2 AS (
+    //    WITH
+    //      t1 AS (SELECT * FROM r1),
+    //      t2 AS (SELECT * FROM r1)
+    //    SELECT * FROM t2
+    //  )
+    // SELECT * FROM r2
+    // r1 should be inlined as it's only referenced once: main query -> r2 -> 
t2 -> r1
+    val r1 = CTERelationDef(Project(Seq(Alias(Rand(Literal(0)), "r")()), 
OneRowRelation()))
+    val r1Ref = CTERelationRef(r1.id, r1.resolved, r1.output, r1.isStreaming)
+    val t1 = CTERelationDef(Project(r1.output, r1Ref))
+    val t2 = CTERelationDef(Project(r1.output, r1Ref))
+    val t2Ref = CTERelationRef(t2.id, t2.resolved, t2.output, t2.isStreaming)
+    val r2 = CTERelationDef(WithCTE(Project(t2.output, t2Ref), Seq(t1, t2)))
+    val r2Ref = CTERelationRef(r2.id, r2.resolved, r2.output, r2.isStreaming)
+    val query = WithCTE(Project(r2.output, r2Ref), Seq(r1, r2))
+    val inlined = InlineCTE().apply(query)
+    assert(!inlined.exists(_.isInstanceOf[WithCTE]))
+  }
+
+  test("SPARK-49816: complicated reference count 2") {
+    // Manually build the logical plan for
+    // WITH
+    //  r1 AS (SELECT random()),
+    //  r2 AS (
+    //    WITH
+    //      t1 AS (SELECT * FROM r1),
+    //      t2 AS (SELECT * FROM t1)
+    //    SELECT * FROM t2
+    //  )
+    // SELECT * FROM r1
+    // This is similar to the previous test case, but t2 reference t1 instead 
of r1, and the main
+    // query references r1. r1 should be inlined as r2 is not referenced at 
all.
+    val r1 = CTERelationDef(Project(Seq(Alias(Rand(Literal(0)), "r")()), 
OneRowRelation()))
+    val r1Ref = CTERelationRef(r1.id, r1.resolved, r1.output, r1.isStreaming)
+    val t1 = CTERelationDef(Project(r1.output, r1Ref))
+    val t1Ref = CTERelationRef(t1.id, t1.resolved, t1.output, t1.isStreaming)
+    val t2 = CTERelationDef(Project(t1.output, t1Ref))
+    val t2Ref = CTERelationRef(t2.id, t2.resolved, t2.output, t2.isStreaming)
+    val r2 = CTERelationDef(WithCTE(Project(t2.output, t2Ref), Seq(t1, t2)))
+    val query = WithCTE(Project(r1.output, r1Ref), Seq(r1, r2))
+    val inlined = InlineCTE().apply(query)
+    assert(!inlined.exists(_.isInstanceOf[WithCTE]))
+  }
+
+  test("SPARK-49816: complicated reference count 3") {
+    // Manually build the logical plan for
+    // WITH
+    //  r1 AS (
+    //    WITH
+    //      t1 AS (SELECT random()),
+    //      t2 AS (SELECT * FROM t1)
+    //    SELECT * FROM t2
+    //  ),
+    //  r2 AS (
+    //    WITH
+    //      t1 AS (SELECT random()),
+    //      t2 AS (SELECT * FROM r1)
+    //    SELECT * FROM t2
+    //  )
+    // SELECT * FROM r1 UNION ALL SELECT * FROM r2
+    // The inner WITH in r1 and r2 should become `SELECT random()` and r1/r2 
should be inlined.
+    val t1 = CTERelationDef(Project(Seq(Alias(Rand(Literal(0)), "r")()), 
OneRowRelation()))
+    val t1Ref = CTERelationRef(t1.id, t1.resolved, t1.output, t1.isStreaming)
+    val t2 = CTERelationDef(Project(t1.output, t1Ref))
+    val t2Ref = CTERelationRef(t2.id, t2.resolved, t2.output, t2.isStreaming)
+    val cte = WithCTE(Project(t2.output, t2Ref), Seq(t1, t2))
+    val r1 = CTERelationDef(cte)
+    val r1Ref = CTERelationRef(r1.id, r1.resolved, r1.output, r1.isStreaming)
+    val r2 = CTERelationDef(cte)
+    val r2Ref = CTERelationRef(r2.id, r2.resolved, r2.output, r2.isStreaming)
+    val query = WithCTE(Union(r1Ref, r2Ref), Seq(r1, r2))
+    val inlined = InlineCTE().apply(query)
+    assert(!inlined.exists(_.isInstanceOf[WithCTE]))
+  }
 }
 
 class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with 
DisableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to