This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e57f0629ece [SPARK-48307][SQL][FOLLOWUP] not-inlined CTE references 
sibling should not fail
4e57f0629ece is described below

commit 4e57f0629ece8ec7495901d69fac5b0376eda504
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Jun 28 21:36:00 2024 +0800

    [SPARK-48307][SQL][FOLLOWUP] not-inlined CTE references sibling should not 
fail
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of https://github.com/apache/spark/pull/46617 to fix a 
bug. When we re-construct the `WithCTE` node, we should use the new CTE 
definitions that have been applied `inlineCTE`.
    
    ### Why are the changes needed?
    
    bug fix, otherwise we may hit errors such as
    ```
    java.util.NoSuchElementException: key not found: 0
            at scala.collection.MapOps.default(Map.scala:289)
            at scala.collection.MapOps.default$(Map.scala:288)
            at scala.collection.AbstractMap.default(Map.scala:420)
            at scala.collection.mutable.HashMap.apply(HashMap.scala:440)
            at 
org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.gatherPredicatesAndAttributes(PushdownPredicatesAndPruneColumnsForCTEDef.scala:74)
            at 
org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.$anonfun$gatherPredicatesAndAttributes$1(PushdownPredicatesAndPruneColumnsForCTEDef.scala:68)
            at scala.collection.immutable.Vector.foreach(Vector.scala:2124)
            at 
org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.gatherPredicatesAndAttributes(PushdownPredicatesAndPruneColumnsForCTEDef.scala:67)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, the bug is not released yet
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #47141 from cloud-fan/fix.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/catalyst/optimizer/InlineCTE.scala   | 15 ++++++++-------
 .../test/scala/org/apache/spark/sql/CTEInlineSuite.scala  | 11 +++++++++++
 2 files changed, 19 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
index 50828b945bb4..8cc25328ce70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
@@ -140,7 +140,8 @@ case class InlineCTE(
       cteMap: mutable.Map[Long, CTEReferenceInfo]): LogicalPlan = {
     plan match {
       case WithCTE(child, cteDefs) =>
-        val remainingDefs = cteDefs.filter { cteDef =>
+        val notInlined = mutable.ArrayBuffer.empty[CTERelationDef]
+        cteDefs.foreach { cteDef =>
           val refInfo = cteMap(cteDef.id)
           if (refInfo.refCount > 0) {
             val newDef = refInfo.cteDef.copy(child = 
inlineCTE(refInfo.cteDef.child, cteMap))
@@ -148,17 +149,17 @@ case class InlineCTE(
             cteMap(cteDef.id) = cteMap(cteDef.id).copy(
               cteDef = newDef, shouldInline = inlineDecision
             )
-            // Retain the not-inlined CTE relations in place.
-            !inlineDecision
-          } else {
-            keepDanglingRelations
+            if (!inlineDecision) notInlined += newDef
+          } else if (keepDanglingRelations) {
+            notInlined += refInfo.cteDef
           }
         }
         val inlined = inlineCTE(child, cteMap)
-        if (remainingDefs.isEmpty) {
+        if (notInlined.isEmpty) {
           inlined
         } else {
-          WithCTE(inlined, remainingDefs)
+          // Retain the not-inlined CTE relations in place.
+          WithCTE(inlined, notInlined.toSeq)
         }
 
       case ref: CTERelationRef =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index a06b50d175f9..7b608b7438c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -703,6 +703,17 @@ abstract class CTEInlineSuiteBase
       checkErrorTableNotFound(e, "`tab_non_exists`", 
ExpectedContext("tab_non_exists", 83, 96))
     }
   }
+
+  test("SPARK-48307: not-inlined CTE references sibling") {
+    val df = sql(
+      """
+        |WITH
+        |v1 AS (SELECT 1 col),
+        |v2 AS (SELECT col, rand() FROM v1)
+        |SELECT l.col FROM v2 l JOIN v2 r ON l.col = r.col
+        |""".stripMargin)
+    checkAnswer(df, Row(1))
+  }
 }
 
 class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with 
DisableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to