This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new d46bc35412c1 [SPARK-46625][SQL] Place IDENTIFIER placeholder in 
command name slot
d46bc35412c1 is described below

commit d46bc35412c1ea340b65cf36d3068da8c85919fc
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed May 20 20:59:54 2026 +0800

    [SPARK-46625][SQL] Place IDENTIFIER placeholder in command name slot
    
    ### What changes were proposed in this pull request?
    
    Root-cause fix for SPARK-46625. Supersedes #55706.
    
    Push `PlanWithUnresolvedIdentifier` into the command's identifier slot at 
parse time, for every parser-built command, instead of wrapping the whole 
command. With this change, `CTESubstitution` sees the real `CTEInChildren` 
command directly and places `WithCTE` on the command's children by construction 
-- the invalid `WithCTE(InsertIntoStatement, ...)` / 
`WithCTE(CreateTableAsSelect, ...)` shape never appears.
    
    **Parser placement (`AstBuilder`):**
    
    - `AstBuilder.withInsertInto`, `visitCreateTable`, `visitReplaceTable` 
build the command directly. A new helper `buildWriteTableSlot` returns 
`NamedRelation` and serves both `InsertIntoStatement.table` and 
`OverwriteByExpression.table` (whose slot is typed `NamedRelation`). CTAS/RTAS 
use single-arg `withIdentClause` to put the placeholder in the `name` slot.
    - `visitCacheTable`'s AS path uses a new helper 
`buildCacheTableAsSelectName` to build the temp-view name as an `Expression`. 
The non-AS path uses single-arg `withIdentClause`.
    
    **`CacheTableAsSelect.tempViewName: String -> Expression`:**
    
    The last identifier slot still expressed as a plain `String` is now an 
`Expression`. The parser produces a non-null string `Literal` for direct 
identifiers and `IDENTIFIER('literal')`, or an 
`ExpressionWithUnresolvedIdentifier` for `IDENTIFIER(<non-literal>)`. The 
single-part invariant is validated at parse time for literal cases and at 
materialization for the non-literal case. `CheckAnalysis` enforces the 
post-analysis invariant that `tempViewName` is a non-null string `Literal`, and 
[...]
    
    **Placeholder shape (`unresolved.scala`):**
    
    - `PlanWithUnresolvedIdentifier` mixes in `NamedRelation` so it can occupy 
`OverwriteByExpression.table` (typed `NamedRelation`) directly.
    - It does **not** extend `CTEInChildren`. With the in-slot placement plus 
the `CacheTableAsSelect` refactor, no parser caller places the placeholder as 
the substitution root of a `WITH ... <command>` subtree, so a `CTEInChildren` 
safety net has no reachable path under the current grammar.
    
    **Materialization (`ResolveIdentifierClause`):**
    
    `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child 
`LogicalPlan` slots (`child = query`), so the default `resolveOperatorsUp` 
traversal never visits placeholders inside them. Two special-cases recurse 
explicitly:
    
    - `case i: InsertIntoStatement if 
i.table.isInstanceOf[PlanWithUnresolvedIdentifier] => ...`
    - `case w: V2WriteCommand if 
w.table.isInstanceOf[PlanWithUnresolvedIdentifier] => ...`
    
    Each case extracts the placeholder with a single `asInstanceOf` at the top 
of the body and inlines the `identifierExpr.resolved && childrenResolved` 
check, returning the unchanged command when not yet ready.
    
    The `V2WriteCommand` match dispatches via the abstract 
`withNewTable(NamedRelation)` and pattern-matches the materialized result as 
`NamedRelation`, throwing an internal error otherwise. Only 
`OverwriteByExpression` is parser-built with a placeholder in `table` today; 
matching the trait keeps the rule consistent for any future analyzer-built node 
in the same shape.
    
    **Tree-pattern propagation (`statements.scala`, `v2Commands.scala`):**
    
    `InsertIntoStatement` and `V2WriteCommand` override 
`getDefaultTreePatternBits` to union `table.treePatternBits`, so 
`containsPattern(...)` pruning correctly reports patterns (`PARAMETER`, 
`PLAN_WITH_UNRESOLVED_IDENTIFIER`) living in `table`.
    
    **Parameter binding (`parameters.scala`):**
    
    `BindParameters.bind` pattern-matches both `InsertIntoStatement` and 
`V2WriteCommand` to recurse into `table`. Without this, `INSERT ... 
IDENTIFIER(:p)` and `INSERT INTO REPLACE WHERE ... IDENTIFIER(:p)` under 
`spark.sql.legacy.parameterSubstitution.constantsOnly=true` would fail to bind 
the parameter.
    
    **`CreateTableAsSelect.name` / `ReplaceTableAsSelect.name`:**
    
    Already children via `V2CreateTableAsSelectPlan.childrenToAnalyze`, so no 
extra traversal hook is needed for them.
    
    The post-hoc `WithCTE(c: CTEInChildren, _) => c.withCTEDefs(cteDefs)` 
collapse in `ResolveIdentifierClause` from #55706 is removed entirely -- no 
command shape reaches the analyzer needing it.
    
    ### Why are the changes needed?
    
    ```sql
    WITH t AS (...)
    INSERT [INTO|OVERWRITE] TABLE IDENTIFIER('t')
    SELECT * FROM t
    ```
    
    previously produced an analysed plan with `WithCTE` wrapping the 
`InsertIntoStatement` -- a structurally invalid shape. Plug-in datasources that 
re-analyse the `InsertIntoStatement`'s query subtree throw 
`NoSuchElementException: key not found`. Zero-day issue since SPARK-46625.
    
    #55706 fixed the symptom by collapsing `WithCTE(c: CTEInChildren, defs)` 
after the fact in `ResolveIdentifierClause`. That works but encodes an undo for 
a placement that should never have happened, and turned out to be unsafe: after 
a later analyzer rewrite (e.g. `RewriteDeleteFromTable` rewriting 
`WithCTE(DeleteFromTable, defs)` into `WithCTE(ReplaceData, defs)`), the 
collapse would push `WithCTE` into `ReplaceData.query` and orphan the CTE refs 
in `ReplaceData.condition` / `groupFil [...]
    
    Credit to stevomitric for surfacing in the original PR thread that the 
parser-level placement also has to work for legacy parameter substitution -- 
that's why this PR adds the targeted `BindParameters` and tree-pattern-bits 
handling for `InsertIntoStatement.table` and `V2WriteCommand.table`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No behavior change, but a minor timing change in `visitCreateTable` / 
`visitReplaceTable`: CTAS/RTAS validation errors (`Schema may not be specified 
in a CTAS statement`, `Partition column types may not be specified...`, 
`Constraints may not be specified...`) for non-literal identifiers now fire at 
parse time rather than at identifier resolution. Same error messages and same 
`ctx`; only the moment of throwing moves earlier. Fail-fast improvement, not a 
regression.
    
    ### How was this patch tested?
    
    New tests in `ParametersSuite`:
    
    - `WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ... FROM cte`
    - `WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte`
    - `CREATE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... FROM cte`
    - `INSERT IDENTIFIER(:p) under legacy parameter substitution` (covers 
`spark.sql.legacy.parameterSubstitution.constantsOnly=true`, which exercises 
the `BindParameters` recursion into `InsertIntoStatement.table`)
    - `WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ... -- parser` 
(asserts the placeholder lives in `OverwriteByExpression.table` and no 
`WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`)
    - `BindParameters recurses into OverwriteByExpression.table` (rule-level 
test; full analysis would require a v2 catalog)
    - `CACHE TABLE IDENTIFIER(...) AS WITH ... SELECT ... -- parser` (asserts 
`tempViewName` holds `ExpressionWithUnresolvedIdentifier` and no 
`WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`)
    - `REPLACE TABLE IDENTIFIER(...) AS WITH ... SELECT ... -- parser` (mirrors 
the CTAS test for RTAS)
    
    Each CTE test asserts that no `WithCTE(CTEInChildren, _)` shape leaks 
through analysis.
    
    `DDLParserSuite.CACHE TABLE` updated to construct `CacheTableAsSelect` with 
`Literal("t")` instead of `"t"` for the new `Expression` slot.
    
    Existing suites run clean: `ParametersSuite`, `DeltaBased/GroupBased 
Delete/Update/Merge` suites (including the rCTE-with-DML tests that were the 
original CI failures), `DataSourceV2DataFrameSuite`, `SQLViewSuite`, 
`IdentifierClauseParserSuite`, `AnalysisSuite` / `PlanParserSuite` / 
`AnalysisErrorSuite` and siblings, `DataSourceV2SQLSuiteV1Filter`, 
`InsertSuite`, `CTEInlineSuite` / `CTEHintSuite`, `CachedTableSuite`, 
`CreateTableAsSelectSuite` / `DDLParserSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude (Claude Code, Opus 4.7)
    
    Closes #55949 from cloud-fan/parser-identifier-cte-placement.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit b6432370bdcc27689ed361415022e6704ded949c)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  17 +
 .../analysis/ResolveIdentifierClause.scala         |  36 +-
 .../spark/sql/catalyst/analysis/parameters.scala   |  29 +-
 .../spark/sql/catalyst/analysis/unresolved.scala   |  18 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 379 ++++++++++++---------
 .../sql/catalyst/plans/logical/statements.scala    |  11 +
 .../sql/catalyst/plans/logical/v2Commands.scala    |  28 +-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |   2 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |   3 +-
 .../org/apache/spark/sql/ParametersSuite.scala     | 206 ++++++++++-
 10 files changed, 554 insertions(+), 175 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index fa4c13bc24af..9c4fbd719a96 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -459,6 +459,23 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
           messageParameters = Map("name" -> "IDENTIFIER", "expr" -> 
p.identifierExpr.sql)
         )
 
+      case c: CacheTableAsSelect if c.tempViewName.resolved =>
+        // The parser builds `tempViewName` as either a `Literal[StringType]` 
(for direct
+        // identifiers and `IDENTIFIER('literal')`) or an 
`ExpressionWithUnresolvedIdentifier`
+        // that resolves to such a Literal. Validate the post-analysis shape 
so any future
+        // construction path that violates the invariant fails loudly here, 
not deep inside
+        // execution via `tempViewNameString`. The `resolved` guard ensures 
that when the
+        // IDENTIFIER expression itself failed to resolve (e.g. 
`IDENTIFIER(<unresolved-col>)`),
+        // we fall through to the catch-all `LogicalPlan` case so the user 
sees the proper
+        // `UNRESOLVED_COLUMN` error rather than an internal error.
+        c.tempViewName match {
+          case Literal(value, _: StringType) if value != null => // OK
+          case other =>
+            throw SparkException.internalError(
+              "CacheTableAsSelect.tempViewName must be a non-null string 
literal after " +
+                s"analysis, but got: ${other.sql}")
+        }
+
       case operator: LogicalPlan =>
         operator transformExpressionsDown {
           case hof: HigherOrderFunction if hof.arguments.exists {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
index 7150c81ad64e..cfa6f3358806 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression, VariableReference}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateView, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateView, 
InsertIntoStatement, LogicalPlan, V2WriteCommand}
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -70,6 +71,39 @@ class ResolveIdentifierClause(earlyBatches: 
Seq[RuleExecutor[LogicalPlan]#Batch]
 
         executor.execute(p.planBuilder.apply(
           IdentifierResolution.evalIdentifierExpr(p.identifierExpr), 
p.children))
+      // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child 
LogicalPlan slots
+      // (`child = query`), so the standard `resolveOperatorsUp` traversal 
never visits
+      // placeholders inside them. Materialize them explicitly. Only 
`InsertIntoStatement` and
+      // `OverwriteByExpression` carry a parse-time placeholder today, but 
matching the
+      // `V2WriteCommand` trait keeps the rule consistent across the family.
+      case i: InsertIntoStatement if 
i.table.isInstanceOf[PlanWithUnresolvedIdentifier] =>
+        val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier]
+        if (p.identifierExpr.resolved && p.childrenResolved) {
+          if (referredTempVars.isDefined) {
+            referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p)
+          }
+          i.copy(table = executor.execute(p.planBuilder.apply(
+            IdentifierResolution.evalIdentifierExpr(p.identifierExpr), 
p.children)))
+        } else {
+          i
+        }
+      case w: V2WriteCommand if 
w.table.isInstanceOf[PlanWithUnresolvedIdentifier] =>
+        val p = w.table.asInstanceOf[PlanWithUnresolvedIdentifier]
+        if (p.identifierExpr.resolved && p.childrenResolved) {
+          if (referredTempVars.isDefined) {
+            referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p)
+          }
+          executor.execute(p.planBuilder.apply(
+            IdentifierResolution.evalIdentifierExpr(p.identifierExpr), 
p.children)) match {
+            case nr: NamedRelation => w.withNewTable(nr)
+            case other =>
+              throw SparkException.internalError(
+                "PlanWithUnresolvedIdentifier in V2WriteCommand.table must 
materialize " +
+                  s"into a NamedRelation, but got: ${other.getClass.getName}")
+          }
+        } else {
+          w
+        }
       case other =>
         
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER))
 {
           case e: ExpressionWithUnresolvedIdentifier if 
e.identifierExpr.resolved =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
index bf9acb775ce1..31c835986e20 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, 
SubqueryExpression, Unevaluable}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
SupervisingCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, 
LogicalPlan, SupervisingCommand, V2WriteCommand}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, 
PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
@@ -179,9 +179,30 @@ object BindParameters extends Rule[LogicalPlan] with 
QueryErrorsBase {
     p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) {
       case p1 =>
         stop = p1.isInstanceOf[ParameterizedQuery]
-        p1.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (f 
orElse {
-          case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
-        })
+        // `InsertIntoStatement.table` and `V2WriteCommand.table` are 
non-child LogicalPlan
+        // slots, so the standard `resolveOperatorsDown` traversal never 
visits parameter
+        // markers inside them. Recurse explicitly so `INSERT ... 
IDENTIFIER(:p)` and
+        // `INSERT INTO IDENTIFIER(:p) REPLACE WHERE ...` resolve under the 
legacy
+        // parameter-substitution mode (SPARK-46625). Today only the 
`OverwriteByExpression`
+        // variant of `V2WriteCommand` is parser-built with a placeholder in 
`table`; the trait
+        // match keeps the rule consistent for any future analyzer-built node 
in the same shape.
+        val withBoundTable = p1 match {
+          case i: InsertIntoStatement if i.table.containsPattern(PARAMETER) =>
+            i.copy(table = bind(i.table)(f))
+          case w: V2WriteCommand if w.table.containsPattern(PARAMETER) =>
+            bind(w.table)(f) match {
+              case nr: NamedRelation => w.withNewTable(nr)
+              case other =>
+                throw SparkException.internalError(
+                  "Parameter binding on V2WriteCommand.table must preserve " +
+                    s"NamedRelation, but got: ${other.getClass.getName}")
+            }
+          case other => other
+        }
+        
withBoundTable.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (
+          f orElse {
+            case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
+          })
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index bac5651265d9..a5b467d0f081 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -59,12 +59,22 @@ trait UnresolvedUnaryNode extends UnaryNode with 
UnresolvedNode
 /**
  * A logical plan placeholder that holds the identifier clause string 
expression. It will be
  * replaced by the actual logical plan with the evaluated identifier string.
+ *
+ * Extends `NamedRelation` so it can occupy a `NamedRelation`-typed slot (e.g.
+ * `OverwriteByExpression.table`) directly at parse time, instead of wrapping 
the whole command.
+ *
+ * The parser always places this node inside the command's identifier slot (a 
child slot for
+ * DELETE/UPDATE/MERGE/CTAS/RTAS, or a non-child slot for 
`InsertIntoStatement.table` and
+ * `OverwriteByExpression.table` -- handled via explicit cases in 
`ResolveIdentifierClause` and
+ * `BindParameters`). It is never the substitution root of a `WITH ... 
<command>` subtree, so
+ * `CTEInChildren` semantics are not needed: any surrounding `WithCTE` 
produced by
+ * `CTESubstitution` targets the inner command directly.
  */
 case class PlanWithUnresolvedIdentifier(
     identifierExpr: Expression,
     children: Seq[LogicalPlan],
     planBuilder: (Seq[String], Seq[LogicalPlan]) => LogicalPlan)
-  extends UnresolvedNode {
+  extends UnresolvedNode with NamedRelation {
 
   def this(identifierExpr: Expression, planBuilder: Seq[String] => 
LogicalPlan) = {
     this(identifierExpr, Nil, (ident, _) => planBuilder(ident))
@@ -72,6 +82,12 @@ case class PlanWithUnresolvedIdentifier(
 
   final override val nodePatterns: Seq[TreePattern] = 
Seq(PLAN_WITH_UNRESOLVED_IDENTIFIER)
 
+  // Placeholder name used by error paths that render `NamedRelation.name` for 
an unresolved
+  // table reference -- e.g. `SparkStrategies.extractTableNameForError` and 
the `r: NamedRelation`
+  // fallback in `QueryCompilationErrors`. Renders as the SQL text of the 
identifier expression
+  // (e.g. `IDENTIFIER(:p)` or `concat('a', 'b')`) so error messages remain 
informative.
+  override def name: String = identifierExpr.sql
+
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[LogicalPlan]): LogicalPlan =
     copy(identifierExpr, newChildren, planBuilder)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3c1b243f35fe..2b282467b305 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -913,32 +913,31 @@ class AstBuilder extends DataTypeAstBuilder
       query: LogicalPlan,
       queryAliasCtx: TableAliasContext): LogicalPlan = withOrigin(ctx) {
     ctx match {
-      // We cannot push withIdentClause() into the write command because:
-      //   1. `PlanWithUnresolvedIdentifier` is not a NamedRelation
-      //   2. Write commands do not hold the table logical plan as a child, 
and we need to add
-      //      additional resolution code to resolve identifiers inside the 
write commands.
+      // For all `InsertIntoStatement` / `OverwriteByExpression`-producing 
branches, build the
+      // `table` slot directly via `buildWriteTableSlot` so that any
+      // `PlanWithUnresolvedIdentifier` lives *inside* the command's 
identifier slot. This
+      // preserves the `CTEInChildren` shape and lets `CTESubstitution` place 
`WithCTE` on the
+      // command's children correctly (SPARK-46625).
       case table: InsertIntoTableContext =>
         val insertParams = visitInsertIntoTable(table)
-        withIdentClause(insertParams.relationCtx, Seq(query), (ident, 
otherPlans) => {
-          createInsertIntoStatement(
-            insertParams = insertParams,
-            ident = ident,
-            query = otherPlans.head,
-            overwrite = false,
-            writePrivileges = Set(TableWritePrivilege.INSERT),
-            withSchemaEvolution = table.EVOLUTION() != null)
-        })
+        val privileges = Set(TableWritePrivilege.INSERT)
+        createInsertIntoStatement(
+          insertParams = insertParams,
+          tableSlot = buildWriteTableSlot(
+            insertParams.relationCtx, insertParams.options, privileges),
+          query = query,
+          overwrite = false,
+          withSchemaEvolution = table.EVOLUTION() != null)
       case table: InsertOverwriteTableContext =>
         val insertParams = visitInsertOverwriteTable(table)
-        withIdentClause(insertParams.relationCtx, Seq(query), (ident, 
otherPlans) => {
-          createInsertIntoStatement(
-            insertParams = insertParams,
-            ident = ident,
-            query = otherPlans.head,
-            overwrite = true,
-            writePrivileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE),
-            withSchemaEvolution = table.EVOLUTION() != null)
-        })
+        val privileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE)
+        createInsertIntoStatement(
+          insertParams = insertParams,
+          tableSlot = buildWriteTableSlot(
+            insertParams.relationCtx, insertParams.options, privileges),
+          query = query,
+          overwrite = true,
+          withSchemaEvolution = table.EVOLUTION() != null)
       case ctx: InsertIntoReplaceBooleanCondContext =>
         // Although REPLACE WHERE and REPLACE ON share a unified grammar rule, 
they have
         // different SQL semantics:
@@ -958,62 +957,56 @@ class AstBuilder extends DataTypeAstBuilder
             throw 
QueryParsingErrors.insertReplaceWhereTableAliasNotAllowed(ctx.tableAlias())
           }
           val options = Option(ctx.optionsClause())
-          withIdentClause(ctx.identifierReference, Seq(query), (ident, 
otherPlans) => {
-            val table = createUnresolvedRelation(
-              ctx = ctx.identifierReference,
-              ident = ident,
-              optionsClause = options,
-              writePrivileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE),
-              isStreaming = false)
-            val deleteExpr = expression(ctx.replaceCondition)
-            val isByName = ctx.NAME() != null
-            if (isByName) {
-              OverwriteByExpression.byName(
-                table,
-                df = otherPlans.head,
-                deleteExpr,
-                withSchemaEvolution = ctx.EVOLUTION() != null)
-            } else {
-              OverwriteByExpression.byPosition(
-                table,
-                query = otherPlans.head,
-                deleteExpr,
-                withSchemaEvolution = ctx.EVOLUTION() != null)
-            }
-          })
-        } else {
-          val insertParams = visitInsertIntoReplaceOn(ctx)
-          withIdentClause(insertParams.relationCtx, Seq(query), (ident, 
otherPlans) => {
-            val query = {
-              val queryAliasOpt =
-                getTableAliasWithoutColumnAlias(queryAliasCtx, "INSERT REPLACE 
ON")
-
-              queryAliasOpt.map { queryAlias =>
-                withOrigin(queryAliasCtx) {
-                  SubqueryAlias(queryAlias, child = otherPlans.head)
-                }
-              }.getOrElse(otherPlans.head)
-            }
-            createInsertIntoStatement(
-              insertParams = insertParams,
-              ident = ident,
+          val privileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE)
+          // `PlanWithUnresolvedIdentifier` is a `NamedRelation`, so it can 
occupy
+          // `OverwriteByExpression.table` directly; the materialization 
happens in
+          // `ResolveIdentifierClause` via its `OverwriteByExpression` 
special-case.
+          val table = buildWriteTableSlot(ctx.identifierReference, options, 
privileges)
+          val deleteExpr = expression(ctx.replaceCondition)
+          val isByName = ctx.NAME() != null
+          if (isByName) {
+            OverwriteByExpression.byName(
+              table,
+              df = query,
+              deleteExpr,
+              withSchemaEvolution = ctx.EVOLUTION() != null)
+          } else {
+            OverwriteByExpression.byPosition(
+              table,
               query = query,
-              overwrite = true,
-              writePrivileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE),
+              deleteExpr,
               withSchemaEvolution = ctx.EVOLUTION() != null)
-          })
-        }
-      case ctx: InsertIntoReplaceUsingContext =>
-        val insertParams = visitInsertIntoReplaceUsing(ctx)
-        withIdentClause(insertParams.relationCtx, Seq(query), (ident, 
otherPlans) => {
+          }
+        } else {
+          val insertParams = visitInsertIntoReplaceOn(ctx)
+          val privileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE)
+          val finalQuery = {
+            val queryAliasOpt =
+              getTableAliasWithoutColumnAlias(queryAliasCtx, "INSERT REPLACE 
ON")
+            queryAliasOpt.map { queryAlias =>
+              withOrigin(queryAliasCtx) {
+                SubqueryAlias(queryAlias, child = query)
+              }
+            }.getOrElse(query)
+          }
           createInsertIntoStatement(
             insertParams = insertParams,
-            ident = ident,
-            query = otherPlans.head,
+            tableSlot = buildWriteTableSlot(
+              insertParams.relationCtx, insertParams.options, privileges),
+            query = finalQuery,
             overwrite = true,
-            writePrivileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE),
             withSchemaEvolution = ctx.EVOLUTION() != null)
-        })
+        }
+      case ctx: InsertIntoReplaceUsingContext =>
+        val insertParams = visitInsertIntoReplaceUsing(ctx)
+        val privileges = Set(TableWritePrivilege.INSERT, 
TableWritePrivilege.DELETE)
+        createInsertIntoStatement(
+          insertParams = insertParams,
+          tableSlot = buildWriteTableSlot(
+            insertParams.relationCtx, insertParams.options, privileges),
+          query = query,
+          overwrite = true,
+          withSchemaEvolution = ctx.EVOLUTION() != null)
       case dir: InsertOverwriteDirContext =>
         val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
         InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
@@ -1142,18 +1135,12 @@ class AstBuilder extends DataTypeAstBuilder
    */
   private def createInsertIntoStatement(
       insertParams: InsertTableParams,
-      ident: Seq[String],
+      tableSlot: LogicalPlan,
       query: LogicalPlan,
       overwrite: Boolean,
-      writePrivileges: Set[TableWritePrivilege],
       withSchemaEvolution: Boolean): InsertIntoStatement = {
     InsertIntoStatement(
-      table = createUnresolvedRelation(
-        ctx = insertParams.relationCtx,
-        ident = ident,
-        optionsClause = insertParams.options,
-        writePrivileges = writePrivileges,
-        isStreaming = false),
+      table = tableSlot,
       partitionSpec = insertParams.partitionSpec,
       userSpecifiedCols = insertParams.userSpecifiedCols,
       query = query,
@@ -1164,6 +1151,27 @@ class AstBuilder extends DataTypeAstBuilder
       withSchemaEvolution = withSchemaEvolution)
   }
 
+  /**
+   * Build the `table` slot of a write command. If the identifier reference is 
a constant string,
+   * returns an [[UnresolvedRelation]] directly; otherwise returns a
+   * [[PlanWithUnresolvedIdentifier]] that materializes into an 
[[UnresolvedRelation]] once the
+   * identifier expression is resolved. Both branches produce a 
[[NamedRelation]], so the result
+   * fits `NamedRelation`-typed slots (e.g. `OverwriteByExpression.table`) as 
well as the more
+   * general `LogicalPlan` slot of `InsertIntoStatement.table`.
+   *
+   * Placing the placeholder in the identifier slot (rather than wrapping the 
entire write command)
+   * preserves the `CTEInChildren` shape at parse time, so `CTESubstitution` 
places `WithCTE` on the
+   * command's children correctly. See SPARK-46625.
+   */
+  private def buildWriteTableSlot(
+      ctx: IdentifierReferenceContext,
+      optionsClause: Option[OptionsClauseContext],
+      writePrivileges: Set[TableWritePrivilege]): NamedRelation = {
+    withIdentClause(ctx, parts =>
+      createUnresolvedRelation(ctx, parts, optionsClause, writePrivileges, 
isStreaming = false))
+      .asInstanceOf[NamedRelation]
+  }
+
   /**
    * Write to a directory, returning a [[InsertIntoDir]] logical plan.
    */
@@ -5662,42 +5670,45 @@ class AstBuilder extends DataTypeAstBuilder
         bucketSpec.map(_.asTransform) ++
         clusterBySpec.map(_.asTransform)
 
-    val asSelectPlan = Option(ctx.query).map(plan).toSeq
-    withIdentClause(identifierContext, asSelectPlan, (identifiers, otherPlans) 
=> {
-      val namedConstraints =
-        constraints.map(c => c.withTableName(identifiers.last))
-      val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
-        collation, serdeInfo, external, namedConstraints)
-      val identifier = withOrigin(identifierContext) {
-        UnresolvedIdentifier(identifiers)
-      }
-      otherPlans.headOption match {
-        case Some(_) if columns.nonEmpty =>
+    Option(ctx.query).map(plan) match {
+      case Some(query) =>
+        // CTAS path: push the identifier placeholder into the `name` slot so 
that
+        // `CTESubstitution` sees the `CreateTableAsSelect` (a 
`CTEInChildren`) directly
+        // and places `WithCTE` on its children (SPARK-46625). CTAS disallows 
constraints /
+        // user-specified columns / non-reference partition columns, so we 
don't need the
+        // identifier parts at parse time.
+        if (columns.nonEmpty) {
           operationNotAllowed(
-            "Schema may not be specified in a Create Table As Select (CTAS) 
statement",
-            ctx)
-
-        case Some(_) if partCols.nonEmpty =>
-          // non-reference partition columns are not allowed because schema 
can't be specified
+            "Schema may not be specified in a Create Table As Select (CTAS) 
statement", ctx)
+        }
+        if (partCols.nonEmpty) {
           operationNotAllowed(
-            "Partition column types may not be specified in Create Table As 
Select (CTAS)",
-            ctx)
-
-        case Some(_) if constraints.nonEmpty =>
+            "Partition column types may not be specified in Create Table As 
Select (CTAS)", ctx)
+        }
+        if (constraints.nonEmpty) {
           operationNotAllowed(
-            "Constraints may not be specified in a Create Table As Select 
(CTAS) statement",
-            ctx)
-
-        case Some(query) =>
-          CreateTableAsSelect(identifier, partitioning, query, tableSpec, 
Map.empty, ifNotExists)
-
-        case _ =>
+            "Constraints may not be specified in a Create Table As Select 
(CTAS) statement", ctx)
+        }
+        val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
+          collation, serdeInfo, external, constraints = Nil)
+        val nameSlot = withIdentClause(identifierContext, identifiers =>
+          withOrigin(identifierContext) { UnresolvedIdentifier(identifiers) })
+        CreateTableAsSelect(nameSlot, partitioning, query, tableSpec, 
Map.empty, ifNotExists)
+      case None =>
+        withIdentClause(identifierContext, identifiers => {
+          val namedConstraints =
+            constraints.map(c => c.withTableName(identifiers.last))
+          val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
+            collation, serdeInfo, external, namedConstraints)
+          val identifier = withOrigin(identifierContext) {
+            UnresolvedIdentifier(identifiers)
+          }
           // Note: table schema includes both the table columns list and the 
partition columns
           // with data type.
           val allColumns = columns ++ partCols
           CreateTable(identifier, allColumns, partitioning, tableSpec, 
ignoreIfExists = ifNotExists)
-      }
-    })
+        })
+    }
   }
 
   /**
@@ -5746,43 +5757,42 @@ class AstBuilder extends DataTypeAstBuilder
         clusterBySpec.map(_.asTransform)
 
     val identifierContext = ctx.replaceTableHeader().identifierReference()
-    val asSelectPlan = Option(ctx.query).map(plan).toSeq
-    withIdentClause(identifierContext, asSelectPlan, (identifiers, otherPlans) 
=> {
-      val namedConstraints =
-        constraints.map(c => c.withTableName(identifiers.last))
-      val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
-        collation, serdeInfo, external = false, namedConstraints)
-      val identifier = withOrigin(identifierContext) {
-        UnresolvedIdentifier(identifiers)
-      }
-      otherPlans.headOption match {
-        case Some(_) if columns.nonEmpty =>
+    Option(ctx.query).map(plan) match {
+      case Some(query) =>
+        // RTAS path: push the identifier placeholder into the `name` slot 
(see CTAS above).
+        if (columns.nonEmpty) {
           operationNotAllowed(
-            "Schema may not be specified in a Replace Table As Select (RTAS) 
statement",
-            ctx)
-
-        case Some(_) if partCols.nonEmpty =>
-          // non-reference partition columns are not allowed because schema 
can't be specified
+            "Schema may not be specified in a Replace Table As Select (RTAS) 
statement", ctx)
+        }
+        if (partCols.nonEmpty) {
           operationNotAllowed(
-            "Partition column types may not be specified in Replace Table As 
Select (RTAS)",
-            ctx)
-
-        case Some(_) if constraints.nonEmpty =>
+            "Partition column types may not be specified in Replace Table As 
Select (RTAS)", ctx)
+        }
+        if (constraints.nonEmpty) {
           operationNotAllowed(
-            "Constraints may not be specified in a Replace Table As Select 
(RTAS) statement",
-            ctx)
-
-        case Some(query) =>
-          ReplaceTableAsSelect(identifier, partitioning, query, tableSpec,
-            writeOptions = Map.empty, orCreate = orCreate)
-
-        case _ =>
+            "Constraints may not be specified in a Replace Table As Select 
(RTAS) statement", ctx)
+        }
+        val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
+          collation, serdeInfo, external = false, constraints = Nil)
+        val nameSlot = withIdentClause(identifierContext, identifiers =>
+          withOrigin(identifierContext) { UnresolvedIdentifier(identifiers) })
+        ReplaceTableAsSelect(nameSlot, partitioning, query, tableSpec,
+          writeOptions = Map.empty, orCreate = orCreate)
+      case None =>
+        withIdentClause(identifierContext, identifiers => {
+          val namedConstraints =
+            constraints.map(c => c.withTableName(identifiers.last))
+          val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
+            collation, serdeInfo, external = false, namedConstraints)
+          val identifier = withOrigin(identifierContext) {
+            UnresolvedIdentifier(identifiers)
+          }
           // Note: table schema includes both the table columns list and the 
partition columns
           // with data type.
           val allColumns = columns ++ partCols
           ReplaceTable(identifier, allColumns, partitioning, tableSpec, 
orCreate = orCreate)
-      }
-    })
+        })
+    }
   }
 
   /**
@@ -6524,35 +6534,74 @@ class AstBuilder extends DataTypeAstBuilder
    * }}}
    */
   override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = 
withOrigin(ctx) {
-    val query = Option(ctx.query).map(plan)
-    withIdentClause(ctx.identifierReference, query.toSeq, (ident, children) => 
{
-      if (query.isDefined && ident.length > 1) {
-        val catalogAndNamespace = ident.init
-        throw QueryParsingErrors.addCatalogInCacheTableAsSelectNotAllowedError(
-          catalogAndNamespace.quoted, ctx)
-      }
-      val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
-      val isLazy = ctx.LAZY != null
-      if (query.isDefined) {
+    val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    val isLazy = ctx.LAZY != null
+    Option(ctx.query).map(plan) match {
+      case Some(query) =>
         // Disallow parameter markers in the query of the cache.
         // We need this limitation because we store the original query text, 
pre substitution.
-        // To lift this we would need to reconstitute the query with parameter 
markers replaced with
-        // the values given at CACHE TABLE time, or we would need to store the 
parameter values
-        // alongside the text.
-        // The same rule can be found in CREATE VIEW builder.
-        checkInvalidParameter(query.get, "the query of CACHE TABLE")
-        CacheTableAsSelect(ident.head, children.head, source(ctx.query()), 
isLazy, options)
-      } else {
-        CacheTable(
-          createUnresolvedRelation(
-            ctx.identifierReference,
-            ident,
-            None,
-            writePrivileges = Set.empty,
-            isStreaming = false),
-          ident, isLazy, options)
+        // To lift this we would need to reconstitute the query with parameter 
markers replaced
+        // with the values given at CACHE TABLE time, or we would need to 
store the parameter
+        // values alongside the text. The same rule can be found in CREATE 
VIEW builder.
+        checkInvalidParameter(query, "the query of CACHE TABLE")
+        // `CacheTableAsSelect.tempViewName` is an `Expression` slot: a 
`Literal` for direct
+        // identifiers and `IDENTIFIER('literal-string')`, or an
+        // `ExpressionWithUnresolvedIdentifier` for 
`IDENTIFIER(<non-literal>)`. Building the name
+        // as an expression avoids the wrap-the-whole-command form (where the
+        // `PlanWithUnresolvedIdentifier` would wrap the entire 
`CacheTableAsSelect`), which is the
+        // last shape that motivated the `WithCTE(<command>, _)` workaround 
chain in SPARK-46625.
+        val nameExpr = buildCacheTableAsSelectName(ctx.identifierReference, 
ctx)
+        CacheTableAsSelect(nameExpr, query, source(ctx.query()), isLazy, 
options)
+      case None =>
+        withIdentClause(ctx.identifierReference, ident => {
+          CacheTable(
+            createUnresolvedRelation(
+              ctx.identifierReference,
+              ident,
+              None,
+              writePrivileges = Set.empty,
+              isStreaming = false),
+            ident, isLazy, options)
+        })
+    }
+  }
+
+  /**
+   * Build the `tempViewName` expression for a `CACHE TABLE ... AS SELECT` 
command from an
+   * `identifierReference` context.
+   *
+   * `CacheTableAsSelect` requires a single-part temp view name (no 
catalog/namespace). For direct
+   * identifiers and `IDENTIFIER('literal-string')` we validate this at parse 
time and produce a
+   * non-null string `Literal`. For `IDENTIFIER(<non-literal>)` we emit an
+   * `ExpressionWithUnresolvedIdentifier` whose builder validates the 
single-part invariant when
+   * the identifier expression is resolved.
+   */
+  private def buildCacheTableAsSelectName(
+      ctx: IdentifierReferenceContext,
+      parentCtx: CacheTableContext): Expression = {
+    // Use the outer `parentCtx` for the multi-part error so the query context 
points at the
+    // whole `CACHE TABLE ... AS ...` statement, not just the identifier 
reference. The caller
+    // (`visitCacheTable`) already has `withOrigin(parentCtx)` in scope.
+    def singlePart(parts: Seq[String]): String = {
+      if (parts.length > 1) {
+        throw QueryParsingErrors.addCatalogInCacheTableAsSelectNotAllowedError(
+          parts.init.quoted, parentCtx)
       }
-    })
+      parts.head
+    }
+    val exprCtx = ctx.expression
+    if (exprCtx != null) {
+      expression(exprCtx) match {
+        case Literal(value, _: StringType) if value != null =>
+          Literal(singlePart(parseMultipartIdentifier(value.toString)))
+        case expr =>
+          new ExpressionWithUnresolvedIdentifier(
+            withOrigin(exprCtx) { expr },
+            parts => Literal(singlePart(parts)))
+      }
+    } else {
+      Literal(singlePart(visitMultipartIdentifier(ctx.multipartIdentifier)))
+    }
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 774c783ecf8a..4fbe71ed7d3e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.trees.{LeafLike, 
UnaryLike}
 import org.apache.spark.sql.connector.catalog.ColumnDefaultValue
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.collection.BitSet
 
 /**
  * A logical plan node that contains exactly what was parsed from SQL.
@@ -210,6 +211,16 @@ case class InsertIntoStatement(
   override def child: LogicalPlan = query
   override protected def withNewChildInternal(newChild: LogicalPlan): 
InsertIntoStatement =
     copy(query = newChild)
+
+  // `table` is a non-child LogicalPlan slot (`child = query`), so the default 
tree-pattern
+  // propagation in TreeNode/QueryPlan does not see patterns inside it. Add 
`table`'s bits here
+  // so that `containsPattern(...)` pruning correctly reports patterns living 
in `table`
+  // (e.g. `PARAMETER`, `PLAN_WITH_UNRESOLVED_IDENTIFIER`).
+  override protected def getDefaultTreePatternBits: BitSet = {
+    val bits = super.getDefaultTreePatternBits
+    bits.union(table.treePatternBits)
+    bits
+  }
 }
 
 sealed abstract class InsertReplaceCriteria extends Expression with 
Unevaluable {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index b1ab46ee9481..5dd2f10c89cb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, 
MapType, MetadataBuilder, StringType, StructType}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
 
 // For v2 DML commands, it may end up with the v1 fallback code path and need 
to build a DataFrame
 // which is required by the DS v1 API. We need to keep the analyzed input 
query plan to build
@@ -106,6 +107,18 @@ trait V2WriteCommand
 
   override def child: LogicalPlan = query
 
+  // `table` is a non-child slot, so the default tree-pattern propagation in 
TreeNode/QueryPlan
+  // does not see patterns inside it. Add `table`'s bits so that 
`containsPattern(...)` pruning
+  // correctly reports patterns living in `table` (e.g. 
`PLAN_WITH_UNRESOLVED_IDENTIFIER`,
+  // `PARAMETER`). Only `OverwriteByExpression` is constructed at parse time 
with a placeholder
+  // in `table`, but applying this uniformly across all `V2WriteCommand`s 
keeps the invariant
+  // consistent for any future analyzer-built node that lands a placeholder in 
the same slot.
+  override protected def getDefaultTreePatternBits: BitSet = {
+    val bits = super.getDefaultTreePatternBits
+    bits.union(table.treePatternBits)
+    bits
+  }
+
   override lazy val resolved: Boolean = table.resolved && query.resolved && 
outputResolved
 
   def outputResolved: Boolean = {
@@ -1955,7 +1968,7 @@ case class CacheTable(
  * The logical plan of the CACHE TABLE ... AS SELECT command.
  */
 case class CacheTableAsSelect(
-    tempViewName: String,
+    tempViewName: Expression,
     plan: LogicalPlan,
     originalText: String,
     isLazy: Boolean,
@@ -1963,6 +1976,19 @@ case class CacheTableAsSelect(
     isAnalyzed: Boolean = false,
     referredTempFunctions: Seq[String] = Seq.empty)
   extends AnalysisOnlyCommand with CTEInChildren {
+
+  /**
+   * Returns the temp view name string. Must only be called after analysis, 
when `tempViewName`
+   * has been resolved to a non-null string `Literal`. `CheckAnalysis` 
enforces this invariant.
+   */
+  def tempViewNameString: String = tempViewName match {
+    case Literal(value, _: StringType) if value != null => value.toString
+    case other =>
+      throw SparkException.internalError(
+        "CacheTableAsSelect.tempViewName must be a non-null string literal 
after analysis, " +
+          s"but got: ${other.sql}")
+  }
+
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = {
     assert(!isAnalyzed)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index e334f7e4865c..1db22037d31f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2763,7 +2763,7 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("CACHE TABLE t AS SELECT * FROM testData"),
       CacheTableAsSelect(
-        "t",
+        Literal("t"),
         Project(Seq(UnresolvedStar(None)), 
UnresolvedRelation(Seq("testData"))),
         "SELECT * FROM testData",
         false,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 3c58298ec921..ed067a3f00d1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -778,7 +778,8 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
 
     case r: CacheTableAsSelect =>
       CacheTableAsSelectExec(
-        r.tempViewName, r.plan, r.originalText, r.isLazy, r.options, 
r.referredTempFunctions) :: Nil
+        r.tempViewNameString, r.plan, r.originalText, r.isLazy, r.options,
+        r.referredTempFunctions) :: Nil
 
     case r: UncacheTable =>
       def isTempView(table: LogicalPlan): Boolean = table match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
index d6b22431e854..575fcc058169 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
@@ -20,10 +20,12 @@ package org.apache.spark.sql
 import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
 
 import org.apache.spark.sql.catalyst.ExtendedAnalysisException
+import org.apache.spark.sql.catalyst.analysis.{BindParameters, 
CTESubstitution, ExpressionWithUnresolvedIdentifier, NameParameterizedQuery, 
PlanWithUnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.Limit
+import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, 
CTEInChildren, Limit, OverwriteByExpression, ReplaceTableAsSelect, WithCTE}
 import org.apache.spark.sql.catalyst.trees.SQLQueryContext
+import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.functions.{array, call_function, lit, map, 
map_from_arrays, map_from_entries, str_to_map, struct}
 import org.apache.spark.sql.internal.SQLConf
@@ -2460,4 +2462,206 @@ class ParametersSuite extends SharedSparkSession {
       spark.sql("SELECT 1", Array.empty[Any]),
       Row(1))
   }
+
+  // SPARK-46625: WITH ... <write-with-IDENTIFIER> SELECT ... FROM cte
+  // The placeholder is pushed into the command's identifier slot at parse 
time, so
+  // `CTESubstitution` sees the `CTEInChildren` directly and never produces 
the invalid
+  // `WithCTE(InsertIntoStatement, ...)` / `WithCTE(CreateTableAsSelect, ...)` 
shape.
+  private def assertNoWithCTEAroundCTEInChildren(df: DataFrame): Unit = {
+    df.queryExecution.analyzed.foreach {
+      case WithCTE(_: CTEInChildren, _) =>
+        fail(s"Found invalid WithCTE(CTEInChildren, _) 
shape:\n${df.queryExecution.analyzed}")
+      case _ =>
+    }
+  }
+
+  test("SPARK-46625: WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ... 
FROM cte") {
+    withTable("t_cte_overwrite") {
+      sql("CREATE TABLE t_cte_overwrite (a INT) USING PARQUET")
+      sql("INSERT INTO t_cte_overwrite VALUES (10)")
+      val df = spark.sql(
+        """WITH transformation AS (SELECT 1 AS a)
+          |INSERT OVERWRITE TABLE IDENTIFIER(:tname)
+          |SELECT * FROM transformation""".stripMargin,
+        Map("tname" -> "t_cte_overwrite"))
+      assertNoWithCTEAroundCTEInChildren(df)
+      checkAnswer(spark.table("t_cte_overwrite"), Row(1))
+    }
+  }
+
+  test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte") 
{
+    withTable("t_cte_into") {
+      sql("CREATE TABLE t_cte_into (a INT) USING PARQUET")
+      val df = spark.sql(
+        """WITH transformation AS (SELECT 7 AS a)
+          |INSERT INTO IDENTIFIER(:tname)
+          |SELECT * FROM transformation""".stripMargin,
+        Map("tname" -> "t_cte_into"))
+      assertNoWithCTEAroundCTEInChildren(df)
+      checkAnswer(spark.table("t_cte_into"), Row(7))
+    }
+  }
+
+  test("SPARK-46625: CREATE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... FROM 
cte") {
+    withTable("t_cte_ctas") {
+      val df = spark.sql(
+        """CREATE TABLE IDENTIFIER(:tname) USING PARQUET AS
+          |WITH transformation AS (SELECT 3 AS a)
+          |SELECT * FROM transformation""".stripMargin,
+        Map("tname" -> "t_cte_ctas"))
+      assertNoWithCTEAroundCTEInChildren(df)
+      checkAnswer(spark.table("t_cte_ctas"), Row(3))
+    }
+  }
+
+  // SPARK-46625: legacy parameter-substitution mode triggers the 
parameters.scala traversal
+  // path. The placeholder lives in `InsertIntoStatement.table`, which is 
*not* a child, so this
+  // exercises the `InsertIntoStatement` special-case in `BindParameters.bind` 
that recurses into
+  // the `table` slot, and the `getDefaultTreePatternBits` override on 
`InsertIntoStatement` that
+  // exposes `table`'s tree-pattern bits for pruning.
+  test("SPARK-46625: INSERT IDENTIFIER(:p) under legacy parameter 
substitution") {
+    withSQLConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY.key -> 
"true") {
+      withTable("t_legacy_param") {
+        sql("CREATE TABLE t_legacy_param (a INT) USING PARQUET")
+        spark.sql(
+          """WITH transformation AS (SELECT 11 AS a)
+            |INSERT INTO IDENTIFIER(:tname)
+            |SELECT * FROM transformation""".stripMargin,
+          Map("tname" -> "t_legacy_param"))
+        checkAnswer(spark.table("t_legacy_param"), Row(11))
+      }
+    }
+  }
+
+  // SPARK-46625: INSERT INTO REPLACE WHERE goes through 
`OverwriteByExpression`, whose `table`
+  // slot is typed `NamedRelation`. `PlanWithUnresolvedIdentifier` extends 
`NamedRelation` so the
+  // placeholder sits in the slot directly. Verify on the parsed plan that the 
placeholder lives
+  // in `OverwriteByExpression.table` rather than wrapping the whole command 
-- running the
+  // analyzer fully would require a v2 catalog.
+  test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ... 
parser") {
+    // Use a non-literal-string expression so `withIdentClause` produces
+    // `PlanWithUnresolvedIdentifier` rather than short-circuiting to 
`UnresolvedRelation`.
+    val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+      """WITH transformation AS (SELECT 99 AS a)
+        |INSERT INTO IDENTIFIER('some' || '_table') REPLACE WHERE a = 10
+        |SELECT * FROM transformation""".stripMargin)
+    val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression => 
o }.getOrElse(
+      fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan"))
+    assert(overwrite.table.isInstanceOf[PlanWithUnresolvedIdentifier],
+      s"Expected OverwriteByExpression.table to be 
PlanWithUnresolvedIdentifier, " +
+        s"got ${overwrite.table.getClass.getSimpleName}:\n$parsedPlan")
+    // After CTESubstitution runs, the CTE defs should land on the command's 
children (because
+    // OverwriteByExpression is a CTEInChildren) -- never as 
`WithCTE(OverwriteByExpression, _)`.
+    val substituted = CTESubstitution.apply(parsedPlan)
+    substituted.foreach {
+      case WithCTE(_: CTEInChildren, _) =>
+        fail(s"Found invalid WithCTE(CTEInChildren, _) shape after 
CTESubstitution:\n$substituted")
+      case _ =>
+    }
+  }
+
+  // SPARK-46625: Parameter inside `IDENTIFIER(:p)` on REPLACE WHERE lives in
+  // `OverwriteByExpression.table`, which is a non-child slot. Verify that
+  // `BindParameters.bind` reaches into the slot via the explicit 
`OverwriteByExpression`
+  // recursion (parameters.scala) and that the `getDefaultTreePatternBits` 
override on
+  // `OverwriteByExpression` exposes the PARAMETER bit for pruning. Done at 
the rule level
+  // because driving REPLACE WHERE through full analysis would require a v2 
catalog.
+  test("SPARK-46625: BindParameters recurses into 
OverwriteByExpression.table") {
+    val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+      """INSERT INTO IDENTIFIER(:tname) REPLACE WHERE a = 10
+        |SELECT 1 AS a""".stripMargin)
+    val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression => 
o }.getOrElse(
+      fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan"))
+    // Pruning prerequisite: the PARAMETER bit must be visible at the 
OverwriteByExpression
+    // level (it lives inside `table`, which is not a child); this exercises 
the
+    // `getDefaultTreePatternBits` override.
+    assert(overwrite.containsPattern(PARAMETER),
+      "OverwriteByExpression.getDefaultTreePatternBits must propagate 
`table`'s PARAMETER bit")
+
+    val bound = BindParameters.apply(
+      NameParameterizedQuery(parsedPlan, Seq("tname"), 
Seq(Literal("foo_table"))))
+    val boundOverwrite = bound.collectFirst { case o: OverwriteByExpression => 
o }.getOrElse(
+      fail(s"Expected OverwriteByExpression in bound plan:\n$bound"))
+    assert(!boundOverwrite.table.containsPattern(PARAMETER),
+      s"Expected :tname inside OverwriteByExpression.table to be bound, 
got:\n$boundOverwrite")
+  }
+
+  // SPARK-46625: `CacheTableAsSelect.tempViewName` is an `Expression` slot, 
so an
+  // `IDENTIFIER(<non-literal>)` produces an 
`ExpressionWithUnresolvedIdentifier` there instead of
+  // wrapping the entire command in a `PlanWithUnresolvedIdentifier`. Verify 
on the parsed plan
+  // that the name slot holds the expression placeholder and no 
`WithCTE(CTEInChildren, _)` shape
+  // survives `CTESubstitution` (running the cache through full analysis would 
require the temp
+  // view machinery, so this is a parser-level test).
+  test("SPARK-46625: CACHE TABLE IDENTIFIER(...) AS WITH ... SELECT ... 
parser") {
+    val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+      """CACHE TABLE IDENTIFIER('some' || '_view') AS
+        |WITH transformation AS (SELECT 4 AS a)
+        |SELECT * FROM transformation""".stripMargin)
+    val ctas = parsedPlan.collectFirst { case c: CacheTableAsSelect => c 
}.getOrElse(
+      fail(s"Expected CacheTableAsSelect in parsed plan:\n$parsedPlan"))
+    assert(ctas.tempViewName.isInstanceOf[ExpressionWithUnresolvedIdentifier],
+      s"Expected CacheTableAsSelect.tempViewName to be 
ExpressionWithUnresolvedIdentifier, " +
+        s"got ${ctas.tempViewName.getClass.getSimpleName}:\n$parsedPlan")
+    val substituted = CTESubstitution.apply(parsedPlan)
+    substituted.foreach {
+      case WithCTE(_: CTEInChildren, _) =>
+        fail(s"Found invalid WithCTE(CTEInChildren, _) shape after 
CTESubstitution:\n$substituted")
+      case _ =>
+    }
+  }
+
+  // SPARK-46625: Regression for the `if c.tempViewName.resolved` guard in 
CheckAnalysis. When
+  // the IDENTIFIER expression itself fails to resolve (e.g. references an 
unresolved column),
+  // the guard skips the invariant-validation case so the catch-all 
`LogicalPlan` case can
+  // produce `UNRESOLVED_COLUMN`. Without the guard, the invariant case would 
pre-empt this
+  // path and throw a `SparkException internal error` instead.
+  test("SPARK-46625: CACHE TABLE IDENTIFIER(<unresolved-col>) reports 
UNRESOLVED_COLUMN") {
+    val ex = intercept[AnalysisException] {
+      spark.sql("CACHE TABLE IDENTIFIER(unresolved_col) AS SELECT 1 AS a")
+    }
+    assert(ex.getCondition != null && 
ex.getCondition.startsWith("UNRESOLVED_COLUMN"),
+      s"Expected UNRESOLVED_COLUMN.*, got ${ex.getCondition}: 
${ex.getMessage}")
+    assert(!ex.getMessage.contains("CacheTableAsSelect.tempViewName must be"),
+      s"Internal-error message leaked into user-facing error: 
${ex.getMessage}")
+  }
+
+  // SPARK-46625: End-to-end CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... 
-- exercises the
+  // `tempViewNameString` extraction in `DataSourceV2Strategy` and the 
`CheckAnalysis` invariant
+  // case for `CacheTableAsSelect.tempViewName`. The parser-level test above 
already verifies
+  // the placement and CTE shape; this one drives the full analysis + 
execution path.
+  test("SPARK-46625: CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ...") {
+    withTempView("t_cte_cache") {
+      val df = spark.sql(
+        """CACHE TABLE IDENTIFIER(:tname) AS
+          |WITH transformation AS (SELECT 21 AS a)
+          |SELECT * FROM transformation""".stripMargin,
+        Map("tname" -> "t_cte_cache"))
+      assertNoWithCTEAroundCTEInChildren(df)
+      checkAnswer(spark.table("t_cte_cache"), Row(21))
+    }
+  }
+
+  // SPARK-46625: RTAS mirrors CTAS -- the placeholder goes into 
`ReplaceTableAsSelect.name`
+  // at parse time. Verify on the parsed plan that the placeholder lives in 
that slot and that
+  // no `WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`. Running 
RTAS through full
+  // analysis would require a v2 catalog, so this is a parser-level test.
+  test("SPARK-46625: REPLACE TABLE IDENTIFIER(...) AS WITH ... SELECT ... 
parser") {
+    // Use a non-literal-string expression so `withIdentClause` produces
+    // `PlanWithUnresolvedIdentifier` rather than short-circuiting to 
`UnresolvedIdentifier`.
+    val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+      """REPLACE TABLE IDENTIFIER('some' || '_table') USING PARQUET AS
+        |WITH transformation AS (SELECT 5 AS a)
+        |SELECT * FROM transformation""".stripMargin)
+    val rtas = parsedPlan.collectFirst { case r: ReplaceTableAsSelect => r 
}.getOrElse(
+      fail(s"Expected ReplaceTableAsSelect in parsed plan:\n$parsedPlan"))
+    assert(rtas.name.isInstanceOf[PlanWithUnresolvedIdentifier],
+      s"Expected ReplaceTableAsSelect.name to be PlanWithUnresolvedIdentifier, 
" +
+        s"got ${rtas.name.getClass.getSimpleName}:\n$parsedPlan")
+    val substituted = CTESubstitution.apply(parsedPlan)
+    substituted.foreach {
+      case WithCTE(_: CTEInChildren, _) =>
+        fail(s"Found invalid WithCTE(CTEInChildren, _) shape after 
CTESubstitution:\n$substituted")
+      case _ =>
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to