This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4e57f0629ece [SPARK-48307][SQL][FOLLOWUP] not-inlined CTE references
sibling should not fail
4e57f0629ece is described below
commit 4e57f0629ece8ec7495901d69fac5b0376eda504
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Jun 28 21:36:00 2024 +0800
[SPARK-48307][SQL][FOLLOWUP] not-inlined CTE references sibling should not
fail
### What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/46617 to fix a
bug. When we re-construct the `WithCTE` node, we should use the new CTE
definitions that have been applied `inlineCTE`.
### Why are the changes needed?
bug fix, otherwise we may hit errors such as
```
java.util.NoSuchElementException: key not found: 0
at scala.collection.MapOps.default(Map.scala:289)
at scala.collection.MapOps.default$(Map.scala:288)
at scala.collection.AbstractMap.default(Map.scala:420)
at scala.collection.mutable.HashMap.apply(HashMap.scala:440)
at
org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.gatherPredicatesAndAttributes(PushdownPredicatesAndPruneColumnsForCTEDef.scala:74)
at
org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.$anonfun$gatherPredicatesAndAttributes$1(PushdownPredicatesAndPruneColumnsForCTEDef.scala:68)
at scala.collection.immutable.Vector.foreach(Vector.scala:2124)
at
org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.gatherPredicatesAndAttributes(PushdownPredicatesAndPruneColumnsForCTEDef.scala:67)
```
### Does this PR introduce _any_ user-facing change?
no, the bug is not released yet
### How was this patch tested?
new test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #47141 from cloud-fan/fix.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/optimizer/InlineCTE.scala | 15 ++++++++-------
.../test/scala/org/apache/spark/sql/CTEInlineSuite.scala | 11 +++++++++++
2 files changed, 19 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
index 50828b945bb4..8cc25328ce70 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala
@@ -140,7 +140,8 @@ case class InlineCTE(
cteMap: mutable.Map[Long, CTEReferenceInfo]): LogicalPlan = {
plan match {
case WithCTE(child, cteDefs) =>
- val remainingDefs = cteDefs.filter { cteDef =>
+ val notInlined = mutable.ArrayBuffer.empty[CTERelationDef]
+ cteDefs.foreach { cteDef =>
val refInfo = cteMap(cteDef.id)
if (refInfo.refCount > 0) {
val newDef = refInfo.cteDef.copy(child =
inlineCTE(refInfo.cteDef.child, cteMap))
@@ -148,17 +149,17 @@ case class InlineCTE(
cteMap(cteDef.id) = cteMap(cteDef.id).copy(
cteDef = newDef, shouldInline = inlineDecision
)
- // Retain the not-inlined CTE relations in place.
- !inlineDecision
- } else {
- keepDanglingRelations
+ if (!inlineDecision) notInlined += newDef
+ } else if (keepDanglingRelations) {
+ notInlined += refInfo.cteDef
}
}
val inlined = inlineCTE(child, cteMap)
- if (remainingDefs.isEmpty) {
+ if (notInlined.isEmpty) {
inlined
} else {
- WithCTE(inlined, remainingDefs)
+ // Retain the not-inlined CTE relations in place.
+ WithCTE(inlined, notInlined.toSeq)
}
case ref: CTERelationRef =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index a06b50d175f9..7b608b7438c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -703,6 +703,17 @@ abstract class CTEInlineSuiteBase
checkErrorTableNotFound(e, "`tab_non_exists`",
ExpectedContext("tab_non_exists", 83, 96))
}
}
+
+ test("SPARK-48307: not-inlined CTE references sibling") {
+ val df = sql(
+ """
+ |WITH
+ |v1 AS (SELECT 1 col),
+ |v2 AS (SELECT col, rand() FROM v1)
+ |SELECT l.col FROM v2 l JOIN v2 r ON l.col = r.col
+ |""".stripMargin)
+ checkAnswer(df, Row(1))
+ }
}
class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with
DisableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]