This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new d46bc35412c1 [SPARK-46625][SQL] Place IDENTIFIER placeholder in
command name slot
d46bc35412c1 is described below
commit d46bc35412c1ea340b65cf36d3068da8c85919fc
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed May 20 20:59:54 2026 +0800
[SPARK-46625][SQL] Place IDENTIFIER placeholder in command name slot
### What changes were proposed in this pull request?
Root-cause fix for SPARK-46625. Supersedes #55706.
Push `PlanWithUnresolvedIdentifier` into the command's identifier slot at
parse time, for every parser-built command, instead of wrapping the whole
command. With this change, `CTESubstitution` sees the real `CTEInChildren`
command directly and places `WithCTE` on the command's children by construction
-- the invalid `WithCTE(InsertIntoStatement, ...)` /
`WithCTE(CreateTableAsSelect, ...)` shape never appears.
**Parser placement (`AstBuilder`):**
- `AstBuilder.withInsertInto`, `visitCreateTable`, `visitReplaceTable`
build the command directly. A new helper `buildWriteTableSlot` returns
`NamedRelation` and serves both `InsertIntoStatement.table` and
`OverwriteByExpression.table` (whose slot is typed `NamedRelation`). CTAS/RTAS
use single-arg `withIdentClause` to put the placeholder in the `name` slot.
- `visitCacheTable`'s AS path uses a new helper
`buildCacheTableAsSelectName` to build the temp-view name as an `Expression`.
The non-AS path uses single-arg `withIdentClause`.
**`CacheTableAsSelect.tempViewName: String -> Expression`:**
The last identifier slot still expressed as a plain `String` is now an
`Expression`. The parser produces a non-null string `Literal` for direct
identifiers and `IDENTIFIER('literal')`, or an
`ExpressionWithUnresolvedIdentifier` for `IDENTIFIER(<non-literal>)`. The
single-part invariant is validated at parse time for literal cases and at
materialization for the non-literal case. `CheckAnalysis` enforces the
post-analysis invariant that `tempViewName` is a non-null string `Literal`, and
[...]
**Placeholder shape (`unresolved.scala`):**
- `PlanWithUnresolvedIdentifier` mixes in `NamedRelation` so it can occupy
`OverwriteByExpression.table` (typed `NamedRelation`) directly.
- It does **not** extend `CTEInChildren`. With the in-slot placement plus
the `CacheTableAsSelect` refactor, no parser caller places the placeholder as
the substitution root of a `WITH ... <command>` subtree, so a `CTEInChildren`
safety net has no reachable path under the current grammar.
**Materialization (`ResolveIdentifierClause`):**
`InsertIntoStatement.table` and `V2WriteCommand.table` are non-child
`LogicalPlan` slots (`child = query`), so the default `resolveOperatorsUp`
traversal never visits placeholders inside them. Two special-cases recurse
explicitly:
- `case i: InsertIntoStatement if
i.table.isInstanceOf[PlanWithUnresolvedIdentifier] => ...`
- `case w: V2WriteCommand if
w.table.isInstanceOf[PlanWithUnresolvedIdentifier] => ...`
Each case extracts the placeholder with a single `asInstanceOf` at the top
of the body and inlines the `identifierExpr.resolved && childrenResolved`
check, returning the unchanged command when not yet ready.
The `V2WriteCommand` match dispatches via the abstract
`withNewTable(NamedRelation)` and pattern-matches the materialized result as
`NamedRelation`, throwing an internal error otherwise. Only
`OverwriteByExpression` is parser-built with a placeholder in `table` today;
matching the trait keeps the rule consistent for any future analyzer-built node
in the same shape.
**Tree-pattern propagation (`statements.scala`, `v2Commands.scala`):**
`InsertIntoStatement` and `V2WriteCommand` override
`getDefaultTreePatternBits` to union `table.treePatternBits`, so
`containsPattern(...)` pruning correctly reports patterns (`PARAMETER`,
`PLAN_WITH_UNRESOLVED_IDENTIFIER`) living in `table`.
**Parameter binding (`parameters.scala`):**
`BindParameters.bind` pattern-matches both `InsertIntoStatement` and
`V2WriteCommand` to recurse into `table`. Without this, `INSERT ...
IDENTIFIER(:p)` and `INSERT INTO REPLACE WHERE ... IDENTIFIER(:p)` under
`spark.sql.legacy.parameterSubstitution.constantsOnly=true` would fail to bind
the parameter.
**`CreateTableAsSelect.name` / `ReplaceTableAsSelect.name`:**
Already children via `V2CreateTableAsSelectPlan.childrenToAnalyze`, so no
extra traversal hook is needed for them.
The post-hoc `WithCTE(c: CTEInChildren, _) => c.withCTEDefs(cteDefs)`
collapse in `ResolveIdentifierClause` from #55706 is removed entirely -- no
command shape reaches the analyzer needing it.
### Why are the changes needed?
```sql
WITH t AS (...)
INSERT [INTO|OVERWRITE] TABLE IDENTIFIER('t')
SELECT * FROM t
```
previously produced an analysed plan with `WithCTE` wrapping the
`InsertIntoStatement` -- a structurally invalid shape. Plug-in datasources that
re-analyse the `InsertIntoStatement`'s query subtree throw
`NoSuchElementException: key not found`. Zero-day issue since SPARK-46625.
#55706 fixed the symptom by collapsing `WithCTE(c: CTEInChildren, defs)`
after the fact in `ResolveIdentifierClause`. That works but encodes an undo for
a placement that should never have happened, and turned out to be unsafe: after
a later analyzer rewrite (e.g. `RewriteDeleteFromTable` rewriting
`WithCTE(DeleteFromTable, defs)` into `WithCTE(ReplaceData, defs)`), the
collapse would push `WithCTE` into `ReplaceData.query` and orphan the CTE refs
in `ReplaceData.condition` / `groupFil [...]
Credit to stevomitric for surfacing in the original PR thread that the
parser-level placement also has to work for legacy parameter substitution --
that's why this PR adds the targeted `BindParameters` and tree-pattern-bits
handling for `InsertIntoStatement.table` and `V2WriteCommand.table`.
### Does this PR introduce _any_ user-facing change?
No behavior change, but a minor timing change in `visitCreateTable` /
`visitReplaceTable`: CTAS/RTAS validation errors (`Schema may not be specified
in a CTAS statement`, `Partition column types may not be specified...`,
`Constraints may not be specified...`) for non-literal identifiers now fire at
parse time rather than at identifier resolution. Same error messages and same
`ctx`; only the moment of throwing moves earlier. Fail-fast improvement, not a
regression.
### How was this patch tested?
New tests in `ParametersSuite`:
- `WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ... FROM cte`
- `WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte`
- `CREATE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... FROM cte`
- `INSERT IDENTIFIER(:p) under legacy parameter substitution` (covers
`spark.sql.legacy.parameterSubstitution.constantsOnly=true`, which exercises
the `BindParameters` recursion into `InsertIntoStatement.table`)
- `WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ... -- parser`
(asserts the placeholder lives in `OverwriteByExpression.table` and no
`WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`)
- `BindParameters recurses into OverwriteByExpression.table` (rule-level
test; full analysis would require a v2 catalog)
- `CACHE TABLE IDENTIFIER(...) AS WITH ... SELECT ... -- parser` (asserts
`tempViewName` holds `ExpressionWithUnresolvedIdentifier` and no
`WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`)
- `REPLACE TABLE IDENTIFIER(...) AS WITH ... SELECT ... -- parser` (mirrors
the CTAS test for RTAS)
Each CTE test asserts that no `WithCTE(CTEInChildren, _)` shape leaks
through analysis.
`DDLParserSuite.CACHE TABLE` updated to construct `CacheTableAsSelect` with
`Literal("t")` instead of `"t"` for the new `Expression` slot.
Existing suites run clean: `ParametersSuite`, `DeltaBased/GroupBased
Delete/Update/Merge` suites (including the rCTE-with-DML tests that were the
original CI failures), `DataSourceV2DataFrameSuite`, `SQLViewSuite`,
`IdentifierClauseParserSuite`, `AnalysisSuite` / `PlanParserSuite` /
`AnalysisErrorSuite` and siblings, `DataSourceV2SQLSuiteV1Filter`,
`InsertSuite`, `CTEInlineSuite` / `CTEHintSuite`, `CachedTableSuite`,
`CreateTableAsSelectSuite` / `DDLParserSuite`.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (Claude Code, Opus 4.7)
Closes #55949 from cloud-fan/parser-identifier-cte-placement.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit b6432370bdcc27689ed361415022e6704ded949c)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/CheckAnalysis.scala | 17 +
.../analysis/ResolveIdentifierClause.scala | 36 +-
.../spark/sql/catalyst/analysis/parameters.scala | 29 +-
.../spark/sql/catalyst/analysis/unresolved.scala | 18 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 379 ++++++++++++---------
.../sql/catalyst/plans/logical/statements.scala | 11 +
.../sql/catalyst/plans/logical/v2Commands.scala | 28 +-
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 2 +-
.../datasources/v2/DataSourceV2Strategy.scala | 3 +-
.../org/apache/spark/sql/ParametersSuite.scala | 206 ++++++++++-
10 files changed, 554 insertions(+), 175 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index fa4c13bc24af..9c4fbd719a96 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -459,6 +459,23 @@ trait CheckAnalysis extends LookupCatalog with
QueryErrorsBase with PlanToString
messageParameters = Map("name" -> "IDENTIFIER", "expr" ->
p.identifierExpr.sql)
)
+ case c: CacheTableAsSelect if c.tempViewName.resolved =>
+ // The parser builds `tempViewName` as either a `Literal[StringType]`
(for direct
+ // identifiers and `IDENTIFIER('literal')`) or an
`ExpressionWithUnresolvedIdentifier`
+ // that resolves to such a Literal. Validate the post-analysis shape
so any future
+ // construction path that violates the invariant fails loudly here,
not deep inside
+ // execution via `tempViewNameString`. The `resolved` guard ensures
that when the
+ // IDENTIFIER expression itself failed to resolve (e.g.
`IDENTIFIER(<unresolved-col>)`),
+ // we fall through to the catch-all `LogicalPlan` case so the user
sees the proper
+ // `UNRESOLVED_COLUMN` error rather than an internal error.
+ c.tempViewName match {
+ case Literal(value, _: StringType) if value != null => // OK
+ case other =>
+ throw SparkException.internalError(
+ "CacheTableAsSelect.tempViewName must be a non-null string
literal after " +
+ s"analysis, but got: ${other.sql}")
+ }
+
case operator: LogicalPlan =>
operator transformExpressionsDown {
case hof: HigherOrderFunction if hof.arguments.exists {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
index 7150c81ad64e..cfa6f3358806 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression, VariableReference}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateView, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateView,
InsertIntoStatement, LogicalPlan, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -70,6 +71,39 @@ class ResolveIdentifierClause(earlyBatches:
Seq[RuleExecutor[LogicalPlan]#Batch]
executor.execute(p.planBuilder.apply(
IdentifierResolution.evalIdentifierExpr(p.identifierExpr),
p.children))
+ // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child
LogicalPlan slots
+ // (`child = query`), so the standard `resolveOperatorsUp` traversal
never visits
+ // placeholders inside them. Materialize them explicitly. Only
`InsertIntoStatement` and
+ // `OverwriteByExpression` carry a parse-time placeholder today, but
matching the
+ // `V2WriteCommand` trait keeps the rule consistent across the family.
+ case i: InsertIntoStatement if
i.table.isInstanceOf[PlanWithUnresolvedIdentifier] =>
+ val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier]
+ if (p.identifierExpr.resolved && p.childrenResolved) {
+ if (referredTempVars.isDefined) {
+ referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p)
+ }
+ i.copy(table = executor.execute(p.planBuilder.apply(
+ IdentifierResolution.evalIdentifierExpr(p.identifierExpr),
p.children)))
+ } else {
+ i
+ }
+ case w: V2WriteCommand if
w.table.isInstanceOf[PlanWithUnresolvedIdentifier] =>
+ val p = w.table.asInstanceOf[PlanWithUnresolvedIdentifier]
+ if (p.identifierExpr.resolved && p.childrenResolved) {
+ if (referredTempVars.isDefined) {
+ referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p)
+ }
+ executor.execute(p.planBuilder.apply(
+ IdentifierResolution.evalIdentifierExpr(p.identifierExpr),
p.children)) match {
+ case nr: NamedRelation => w.withNewTable(nr)
+ case other =>
+ throw SparkException.internalError(
+ "PlanWithUnresolvedIdentifier in V2WriteCommand.table must
materialize " +
+ s"into a NamedRelation, but got: ${other.getClass.getName}")
+ }
+ } else {
+ w
+ }
case other =>
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER))
{
case e: ExpressionWithUnresolvedIdentifier if
e.identifierExpr.resolved =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
index bf9acb775ce1..31c835986e20 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression,
SubqueryExpression, Unevaluable}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
SupervisingCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement,
LogicalPlan, SupervisingCommand, V2WriteCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER,
PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
@@ -179,9 +179,30 @@ object BindParameters extends Rule[LogicalPlan] with
QueryErrorsBase {
p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) {
case p1 =>
stop = p1.isInstanceOf[ParameterizedQuery]
- p1.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (f
orElse {
- case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
- })
+ // `InsertIntoStatement.table` and `V2WriteCommand.table` are
non-child LogicalPlan
+ // slots, so the standard `resolveOperatorsDown` traversal never
visits parameter
+ // markers inside them. Recurse explicitly so `INSERT ...
IDENTIFIER(:p)` and
+ // `INSERT INTO IDENTIFIER(:p) REPLACE WHERE ...` resolve under the
legacy
+ // parameter-substitution mode (SPARK-46625). Today only the
`OverwriteByExpression`
+ // variant of `V2WriteCommand` is parser-built with a placeholder in
`table`; the trait
+ // match keeps the rule consistent for any future analyzer-built node
in the same shape.
+ val withBoundTable = p1 match {
+ case i: InsertIntoStatement if i.table.containsPattern(PARAMETER) =>
+ i.copy(table = bind(i.table)(f))
+ case w: V2WriteCommand if w.table.containsPattern(PARAMETER) =>
+ bind(w.table)(f) match {
+ case nr: NamedRelation => w.withNewTable(nr)
+ case other =>
+ throw SparkException.internalError(
+ "Parameter binding on V2WriteCommand.table must preserve " +
+ s"NamedRelation, but got: ${other.getClass.getName}")
+ }
+ case other => other
+ }
+
withBoundTable.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (
+ f orElse {
+ case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
+ })
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index bac5651265d9..a5b467d0f081 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -59,12 +59,22 @@ trait UnresolvedUnaryNode extends UnaryNode with
UnresolvedNode
/**
* A logical plan placeholder that holds the identifier clause string
expression. It will be
* replaced by the actual logical plan with the evaluated identifier string.
+ *
+ * Extends `NamedRelation` so it can occupy a `NamedRelation`-typed slot (e.g.
+ * `OverwriteByExpression.table`) directly at parse time, instead of wrapping
the whole command.
+ *
+ * The parser always places this node inside the command's identifier slot (a
child slot for
+ * DELETE/UPDATE/MERGE/CTAS/RTAS, or a non-child slot for
`InsertIntoStatement.table` and
+ * `OverwriteByExpression.table` -- handled via explicit cases in
`ResolveIdentifierClause` and
+ * `BindParameters`). It is never the substitution root of a `WITH ...
<command>` subtree, so
+ * `CTEInChildren` semantics are not needed: any surrounding `WithCTE`
produced by
+ * `CTESubstitution` targets the inner command directly.
*/
case class PlanWithUnresolvedIdentifier(
identifierExpr: Expression,
children: Seq[LogicalPlan],
planBuilder: (Seq[String], Seq[LogicalPlan]) => LogicalPlan)
- extends UnresolvedNode {
+ extends UnresolvedNode with NamedRelation {
def this(identifierExpr: Expression, planBuilder: Seq[String] =>
LogicalPlan) = {
this(identifierExpr, Nil, (ident, _) => planBuilder(ident))
@@ -72,6 +82,12 @@ case class PlanWithUnresolvedIdentifier(
final override val nodePatterns: Seq[TreePattern] =
Seq(PLAN_WITH_UNRESOLVED_IDENTIFIER)
+ // Placeholder name used by error paths that render `NamedRelation.name` for
an unresolved
+ // table reference -- e.g. `SparkStrategies.extractTableNameForError` and
the `r: NamedRelation`
+ // fallback in `QueryCompilationErrors`. Renders as the SQL text of the
identifier expression
+ // (e.g. `IDENTIFIER(:p)` or `concat('a', 'b')`) so error messages remain
informative.
+ override def name: String = identifierExpr.sql
+
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): LogicalPlan =
copy(identifierExpr, newChildren, planBuilder)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3c1b243f35fe..2b282467b305 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -913,32 +913,31 @@ class AstBuilder extends DataTypeAstBuilder
query: LogicalPlan,
queryAliasCtx: TableAliasContext): LogicalPlan = withOrigin(ctx) {
ctx match {
- // We cannot push withIdentClause() into the write command because:
- // 1. `PlanWithUnresolvedIdentifier` is not a NamedRelation
- // 2. Write commands do not hold the table logical plan as a child,
and we need to add
- // additional resolution code to resolve identifiers inside the
write commands.
+ // For all `InsertIntoStatement` / `OverwriteByExpression`-producing
branches, build the
+ // `table` slot directly via `buildWriteTableSlot` so that any
+ // `PlanWithUnresolvedIdentifier` lives *inside* the command's
identifier slot. This
+ // preserves the `CTEInChildren` shape and lets `CTESubstitution` place
`WithCTE` on the
+ // command's children correctly (SPARK-46625).
case table: InsertIntoTableContext =>
val insertParams = visitInsertIntoTable(table)
- withIdentClause(insertParams.relationCtx, Seq(query), (ident,
otherPlans) => {
- createInsertIntoStatement(
- insertParams = insertParams,
- ident = ident,
- query = otherPlans.head,
- overwrite = false,
- writePrivileges = Set(TableWritePrivilege.INSERT),
- withSchemaEvolution = table.EVOLUTION() != null)
- })
+ val privileges = Set(TableWritePrivilege.INSERT)
+ createInsertIntoStatement(
+ insertParams = insertParams,
+ tableSlot = buildWriteTableSlot(
+ insertParams.relationCtx, insertParams.options, privileges),
+ query = query,
+ overwrite = false,
+ withSchemaEvolution = table.EVOLUTION() != null)
case table: InsertOverwriteTableContext =>
val insertParams = visitInsertOverwriteTable(table)
- withIdentClause(insertParams.relationCtx, Seq(query), (ident,
otherPlans) => {
- createInsertIntoStatement(
- insertParams = insertParams,
- ident = ident,
- query = otherPlans.head,
- overwrite = true,
- writePrivileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE),
- withSchemaEvolution = table.EVOLUTION() != null)
- })
+ val privileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE)
+ createInsertIntoStatement(
+ insertParams = insertParams,
+ tableSlot = buildWriteTableSlot(
+ insertParams.relationCtx, insertParams.options, privileges),
+ query = query,
+ overwrite = true,
+ withSchemaEvolution = table.EVOLUTION() != null)
case ctx: InsertIntoReplaceBooleanCondContext =>
// Although REPLACE WHERE and REPLACE ON share a unified grammar rule,
they have
// different SQL semantics:
@@ -958,62 +957,56 @@ class AstBuilder extends DataTypeAstBuilder
throw
QueryParsingErrors.insertReplaceWhereTableAliasNotAllowed(ctx.tableAlias())
}
val options = Option(ctx.optionsClause())
- withIdentClause(ctx.identifierReference, Seq(query), (ident,
otherPlans) => {
- val table = createUnresolvedRelation(
- ctx = ctx.identifierReference,
- ident = ident,
- optionsClause = options,
- writePrivileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE),
- isStreaming = false)
- val deleteExpr = expression(ctx.replaceCondition)
- val isByName = ctx.NAME() != null
- if (isByName) {
- OverwriteByExpression.byName(
- table,
- df = otherPlans.head,
- deleteExpr,
- withSchemaEvolution = ctx.EVOLUTION() != null)
- } else {
- OverwriteByExpression.byPosition(
- table,
- query = otherPlans.head,
- deleteExpr,
- withSchemaEvolution = ctx.EVOLUTION() != null)
- }
- })
- } else {
- val insertParams = visitInsertIntoReplaceOn(ctx)
- withIdentClause(insertParams.relationCtx, Seq(query), (ident,
otherPlans) => {
- val query = {
- val queryAliasOpt =
- getTableAliasWithoutColumnAlias(queryAliasCtx, "INSERT REPLACE
ON")
-
- queryAliasOpt.map { queryAlias =>
- withOrigin(queryAliasCtx) {
- SubqueryAlias(queryAlias, child = otherPlans.head)
- }
- }.getOrElse(otherPlans.head)
- }
- createInsertIntoStatement(
- insertParams = insertParams,
- ident = ident,
+ val privileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE)
+ // `PlanWithUnresolvedIdentifier` is a `NamedRelation`, so it can
occupy
+ // `OverwriteByExpression.table` directly; the materialization
happens in
+ // `ResolveIdentifierClause` via its `OverwriteByExpression`
special-case.
+ val table = buildWriteTableSlot(ctx.identifierReference, options,
privileges)
+ val deleteExpr = expression(ctx.replaceCondition)
+ val isByName = ctx.NAME() != null
+ if (isByName) {
+ OverwriteByExpression.byName(
+ table,
+ df = query,
+ deleteExpr,
+ withSchemaEvolution = ctx.EVOLUTION() != null)
+ } else {
+ OverwriteByExpression.byPosition(
+ table,
query = query,
- overwrite = true,
- writePrivileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE),
+ deleteExpr,
withSchemaEvolution = ctx.EVOLUTION() != null)
- })
- }
- case ctx: InsertIntoReplaceUsingContext =>
- val insertParams = visitInsertIntoReplaceUsing(ctx)
- withIdentClause(insertParams.relationCtx, Seq(query), (ident,
otherPlans) => {
+ }
+ } else {
+ val insertParams = visitInsertIntoReplaceOn(ctx)
+ val privileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE)
+ val finalQuery = {
+ val queryAliasOpt =
+ getTableAliasWithoutColumnAlias(queryAliasCtx, "INSERT REPLACE
ON")
+ queryAliasOpt.map { queryAlias =>
+ withOrigin(queryAliasCtx) {
+ SubqueryAlias(queryAlias, child = query)
+ }
+ }.getOrElse(query)
+ }
createInsertIntoStatement(
insertParams = insertParams,
- ident = ident,
- query = otherPlans.head,
+ tableSlot = buildWriteTableSlot(
+ insertParams.relationCtx, insertParams.options, privileges),
+ query = finalQuery,
overwrite = true,
- writePrivileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE),
withSchemaEvolution = ctx.EVOLUTION() != null)
- })
+ }
+ case ctx: InsertIntoReplaceUsingContext =>
+ val insertParams = visitInsertIntoReplaceUsing(ctx)
+ val privileges = Set(TableWritePrivilege.INSERT,
TableWritePrivilege.DELETE)
+ createInsertIntoStatement(
+ insertParams = insertParams,
+ tableSlot = buildWriteTableSlot(
+ insertParams.relationCtx, insertParams.options, privileges),
+ query = query,
+ overwrite = true,
+ withSchemaEvolution = ctx.EVOLUTION() != null)
case dir: InsertOverwriteDirContext =>
val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
@@ -1142,18 +1135,12 @@ class AstBuilder extends DataTypeAstBuilder
*/
private def createInsertIntoStatement(
insertParams: InsertTableParams,
- ident: Seq[String],
+ tableSlot: LogicalPlan,
query: LogicalPlan,
overwrite: Boolean,
- writePrivileges: Set[TableWritePrivilege],
withSchemaEvolution: Boolean): InsertIntoStatement = {
InsertIntoStatement(
- table = createUnresolvedRelation(
- ctx = insertParams.relationCtx,
- ident = ident,
- optionsClause = insertParams.options,
- writePrivileges = writePrivileges,
- isStreaming = false),
+ table = tableSlot,
partitionSpec = insertParams.partitionSpec,
userSpecifiedCols = insertParams.userSpecifiedCols,
query = query,
@@ -1164,6 +1151,27 @@ class AstBuilder extends DataTypeAstBuilder
withSchemaEvolution = withSchemaEvolution)
}
+ /**
+ * Build the `table` slot of a write command. If the identifier reference is
a constant string,
+ * returns an [[UnresolvedRelation]] directly; otherwise returns a
+ * [[PlanWithUnresolvedIdentifier]] that materializes into an
[[UnresolvedRelation]] once the
+ * identifier expression is resolved. Both branches produce a
[[NamedRelation]], so the result
+ * fits `NamedRelation`-typed slots (e.g. `OverwriteByExpression.table`) as
well as the more
+ * general `LogicalPlan` slot of `InsertIntoStatement.table`.
+ *
+ * Placing the placeholder in the identifier slot (rather than wrapping the
entire write command)
+ * preserves the `CTEInChildren` shape at parse time, so `CTESubstitution`
places `WithCTE` on the
+ * command's children correctly. See SPARK-46625.
+ */
+ private def buildWriteTableSlot(
+ ctx: IdentifierReferenceContext,
+ optionsClause: Option[OptionsClauseContext],
+ writePrivileges: Set[TableWritePrivilege]): NamedRelation = {
+ withIdentClause(ctx, parts =>
+ createUnresolvedRelation(ctx, parts, optionsClause, writePrivileges,
isStreaming = false))
+ .asInstanceOf[NamedRelation]
+ }
+
/**
* Write to a directory, returning a [[InsertIntoDir]] logical plan.
*/
@@ -5662,42 +5670,45 @@ class AstBuilder extends DataTypeAstBuilder
bucketSpec.map(_.asTransform) ++
clusterBySpec.map(_.asTransform)
- val asSelectPlan = Option(ctx.query).map(plan).toSeq
- withIdentClause(identifierContext, asSelectPlan, (identifiers, otherPlans)
=> {
- val namedConstraints =
- constraints.map(c => c.withTableName(identifiers.last))
- val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
- collation, serdeInfo, external, namedConstraints)
- val identifier = withOrigin(identifierContext) {
- UnresolvedIdentifier(identifiers)
- }
- otherPlans.headOption match {
- case Some(_) if columns.nonEmpty =>
+ Option(ctx.query).map(plan) match {
+ case Some(query) =>
+ // CTAS path: push the identifier placeholder into the `name` slot so
that
+ // `CTESubstitution` sees the `CreateTableAsSelect` (a
`CTEInChildren`) directly
+ // and places `WithCTE` on its children (SPARK-46625). CTAS disallows
constraints /
+ // user-specified columns / non-reference partition columns, so we
don't need the
+ // identifier parts at parse time.
+ if (columns.nonEmpty) {
operationNotAllowed(
- "Schema may not be specified in a Create Table As Select (CTAS)
statement",
- ctx)
-
- case Some(_) if partCols.nonEmpty =>
- // non-reference partition columns are not allowed because schema
can't be specified
+ "Schema may not be specified in a Create Table As Select (CTAS)
statement", ctx)
+ }
+ if (partCols.nonEmpty) {
operationNotAllowed(
- "Partition column types may not be specified in Create Table As
Select (CTAS)",
- ctx)
-
- case Some(_) if constraints.nonEmpty =>
+ "Partition column types may not be specified in Create Table As
Select (CTAS)", ctx)
+ }
+ if (constraints.nonEmpty) {
operationNotAllowed(
- "Constraints may not be specified in a Create Table As Select
(CTAS) statement",
- ctx)
-
- case Some(query) =>
- CreateTableAsSelect(identifier, partitioning, query, tableSpec,
Map.empty, ifNotExists)
-
- case _ =>
+ "Constraints may not be specified in a Create Table As Select
(CTAS) statement", ctx)
+ }
+ val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
+ collation, serdeInfo, external, constraints = Nil)
+ val nameSlot = withIdentClause(identifierContext, identifiers =>
+ withOrigin(identifierContext) { UnresolvedIdentifier(identifiers) })
+ CreateTableAsSelect(nameSlot, partitioning, query, tableSpec,
Map.empty, ifNotExists)
+ case None =>
+ withIdentClause(identifierContext, identifiers => {
+ val namedConstraints =
+ constraints.map(c => c.withTableName(identifiers.last))
+ val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
+ collation, serdeInfo, external, namedConstraints)
+ val identifier = withOrigin(identifierContext) {
+ UnresolvedIdentifier(identifiers)
+ }
// Note: table schema includes both the table columns list and the
partition columns
// with data type.
val allColumns = columns ++ partCols
CreateTable(identifier, allColumns, partitioning, tableSpec,
ignoreIfExists = ifNotExists)
- }
- })
+ })
+ }
}
/**
@@ -5746,43 +5757,42 @@ class AstBuilder extends DataTypeAstBuilder
clusterBySpec.map(_.asTransform)
val identifierContext = ctx.replaceTableHeader().identifierReference()
- val asSelectPlan = Option(ctx.query).map(plan).toSeq
- withIdentClause(identifierContext, asSelectPlan, (identifiers, otherPlans)
=> {
- val namedConstraints =
- constraints.map(c => c.withTableName(identifiers.last))
- val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
- collation, serdeInfo, external = false, namedConstraints)
- val identifier = withOrigin(identifierContext) {
- UnresolvedIdentifier(identifiers)
- }
- otherPlans.headOption match {
- case Some(_) if columns.nonEmpty =>
+ Option(ctx.query).map(plan) match {
+ case Some(query) =>
+ // RTAS path: push the identifier placeholder into the `name` slot
(see CTAS above).
+ if (columns.nonEmpty) {
operationNotAllowed(
- "Schema may not be specified in a Replace Table As Select (RTAS)
statement",
- ctx)
-
- case Some(_) if partCols.nonEmpty =>
- // non-reference partition columns are not allowed because schema
can't be specified
+ "Schema may not be specified in a Replace Table As Select (RTAS)
statement", ctx)
+ }
+ if (partCols.nonEmpty) {
operationNotAllowed(
- "Partition column types may not be specified in Replace Table As
Select (RTAS)",
- ctx)
-
- case Some(_) if constraints.nonEmpty =>
+ "Partition column types may not be specified in Replace Table As
Select (RTAS)", ctx)
+ }
+ if (constraints.nonEmpty) {
operationNotAllowed(
- "Constraints may not be specified in a Replace Table As Select
(RTAS) statement",
- ctx)
-
- case Some(query) =>
- ReplaceTableAsSelect(identifier, partitioning, query, tableSpec,
- writeOptions = Map.empty, orCreate = orCreate)
-
- case _ =>
+ "Constraints may not be specified in a Replace Table As Select
(RTAS) statement", ctx)
+ }
+ val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
+ collation, serdeInfo, external = false, constraints = Nil)
+ val nameSlot = withIdentClause(identifierContext, identifiers =>
+ withOrigin(identifierContext) { UnresolvedIdentifier(identifiers) })
+ ReplaceTableAsSelect(nameSlot, partitioning, query, tableSpec,
+ writeOptions = Map.empty, orCreate = orCreate)
+ case None =>
+ withIdentClause(identifierContext, identifiers => {
+ val namedConstraints =
+ constraints.map(c => c.withTableName(identifiers.last))
+ val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
+ collation, serdeInfo, external = false, namedConstraints)
+ val identifier = withOrigin(identifierContext) {
+ UnresolvedIdentifier(identifiers)
+ }
// Note: table schema includes both the table columns list and the
partition columns
// with data type.
val allColumns = columns ++ partCols
ReplaceTable(identifier, allColumns, partitioning, tableSpec,
orCreate = orCreate)
- }
- })
+ })
+ }
}
/**
@@ -6524,35 +6534,74 @@ class AstBuilder extends DataTypeAstBuilder
* }}}
*/
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan =
withOrigin(ctx) {
- val query = Option(ctx.query).map(plan)
- withIdentClause(ctx.identifierReference, query.toSeq, (ident, children) =>
{
- if (query.isDefined && ident.length > 1) {
- val catalogAndNamespace = ident.init
- throw QueryParsingErrors.addCatalogInCacheTableAsSelectNotAllowedError(
- catalogAndNamespace.quoted, ctx)
- }
- val options =
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
- val isLazy = ctx.LAZY != null
- if (query.isDefined) {
+ val options =
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+ val isLazy = ctx.LAZY != null
+ Option(ctx.query).map(plan) match {
+ case Some(query) =>
// Disallow parameter markers in the query of the cache.
// We need this limitation because we store the original query text,
pre substitution.
- // To lift this we would need to reconstitute the query with parameter
markers replaced with
- // the values given at CACHE TABLE time, or we would need to store the
parameter values
- // alongside the text.
- // The same rule can be found in CREATE VIEW builder.
- checkInvalidParameter(query.get, "the query of CACHE TABLE")
- CacheTableAsSelect(ident.head, children.head, source(ctx.query()),
isLazy, options)
- } else {
- CacheTable(
- createUnresolvedRelation(
- ctx.identifierReference,
- ident,
- None,
- writePrivileges = Set.empty,
- isStreaming = false),
- ident, isLazy, options)
+ // To lift this we would need to reconstitute the query with parameter
markers replaced
+ // with the values given at CACHE TABLE time, or we would need to
store the parameter
+ // values alongside the text. The same rule can be found in CREATE
VIEW builder.
+ checkInvalidParameter(query, "the query of CACHE TABLE")
+ // `CacheTableAsSelect.tempViewName` is an `Expression` slot: a
`Literal` for direct
+ // identifiers and `IDENTIFIER('literal-string')`, or an
+ // `ExpressionWithUnresolvedIdentifier` for
`IDENTIFIER(<non-literal>)`. Building the name
+ // as an expression avoids the wrap-the-whole-command form (where the
+ // `PlanWithUnresolvedIdentifier` would wrap the entire
`CacheTableAsSelect`), which is the
+ // last shape that motivated the `WithCTE(<command>, _)` workaround
chain in SPARK-46625.
+ val nameExpr = buildCacheTableAsSelectName(ctx.identifierReference,
ctx)
+ CacheTableAsSelect(nameExpr, query, source(ctx.query()), isLazy,
options)
+ case None =>
+ withIdentClause(ctx.identifierReference, ident => {
+ CacheTable(
+ createUnresolvedRelation(
+ ctx.identifierReference,
+ ident,
+ None,
+ writePrivileges = Set.empty,
+ isStreaming = false),
+ ident, isLazy, options)
+ })
+ }
+ }
+
+ /**
+ * Build the `tempViewName` expression for a `CACHE TABLE ... AS SELECT`
command from an
+ * `identifierReference` context.
+ *
+ * `CacheTableAsSelect` requires a single-part temp view name (no
catalog/namespace). For direct
+ * identifiers and `IDENTIFIER('literal-string')` we validate this at parse
time and produce a
+ * non-null string `Literal`. For `IDENTIFIER(<non-literal>)` we emit an
+ * `ExpressionWithUnresolvedIdentifier` whose builder validates the
single-part invariant when
+ * the identifier expression is resolved.
+ */
+ private def buildCacheTableAsSelectName(
+ ctx: IdentifierReferenceContext,
+ parentCtx: CacheTableContext): Expression = {
+ // Use the outer `parentCtx` for the multi-part error so the query context
points at the
+ // whole `CACHE TABLE ... AS ...` statement, not just the identifier
reference. The caller
+ // (`visitCacheTable`) already has `withOrigin(parentCtx)` in scope.
+ def singlePart(parts: Seq[String]): String = {
+ if (parts.length > 1) {
+ throw QueryParsingErrors.addCatalogInCacheTableAsSelectNotAllowedError(
+ parts.init.quoted, parentCtx)
}
- })
+ parts.head
+ }
+ val exprCtx = ctx.expression
+ if (exprCtx != null) {
+ expression(exprCtx) match {
+ case Literal(value, _: StringType) if value != null =>
+ Literal(singlePart(parseMultipartIdentifier(value.toString)))
+ case expr =>
+ new ExpressionWithUnresolvedIdentifier(
+ withOrigin(exprCtx) { expr },
+ parts => Literal(singlePart(parts)))
+ }
+ } else {
+ Literal(singlePart(visitMultipartIdentifier(ctx.multipartIdentifier)))
+ }
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 774c783ecf8a..4fbe71ed7d3e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.trees.{LeafLike,
UnaryLike}
import org.apache.spark.sql.connector.catalog.ColumnDefaultValue
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.collection.BitSet
/**
* A logical plan node that contains exactly what was parsed from SQL.
@@ -210,6 +211,16 @@ case class InsertIntoStatement(
override def child: LogicalPlan = query
override protected def withNewChildInternal(newChild: LogicalPlan):
InsertIntoStatement =
copy(query = newChild)
+
+ // `table` is a non-child LogicalPlan slot (`child = query`), so the default
tree-pattern
+ // propagation in TreeNode/QueryPlan does not see patterns inside it. Add
`table`'s bits here
+ // so that `containsPattern(...)` pruning correctly reports patterns living
in `table`
+ // (e.g. `PARAMETER`, `PLAN_WITH_UNRESOLVED_IDENTIFIER`).
+ override protected def getDefaultTreePatternBits: BitSet = {
+ val bits = super.getDefaultTreePatternBits
+ bits.union(table.treePatternBits)
+ bits
+ }
}
sealed abstract class InsertReplaceCriteria extends Expression with
Unevaluable {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index b1ab46ee9481..5dd2f10c89cb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType,
MapType, MetadataBuilder, StringType, StructType}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
+import org.apache.spark.util.collection.BitSet
// For v2 DML commands, it may end up with the v1 fallback code path and need
to build a DataFrame
// which is required by the DS v1 API. We need to keep the analyzed input
query plan to build
@@ -106,6 +107,18 @@ trait V2WriteCommand
override def child: LogicalPlan = query
+ // `table` is a non-child slot, so the default tree-pattern propagation in
TreeNode/QueryPlan
+ // does not see patterns inside it. Add `table`'s bits so that
`containsPattern(...)` pruning
+ // correctly reports patterns living in `table` (e.g.
`PLAN_WITH_UNRESOLVED_IDENTIFIER`,
+ // `PARAMETER`). Only `OverwriteByExpression` is constructed at parse time
with a placeholder
+ // in `table`, but applying this uniformly across all `V2WriteCommand`s
keeps the invariant
+ // consistent for any future analyzer-built node that lands a placeholder in
the same slot.
+ override protected def getDefaultTreePatternBits: BitSet = {
+ val bits = super.getDefaultTreePatternBits
+ bits.union(table.treePatternBits)
+ bits
+ }
+
override lazy val resolved: Boolean = table.resolved && query.resolved &&
outputResolved
def outputResolved: Boolean = {
@@ -1955,7 +1968,7 @@ case class CacheTable(
* The logical plan of the CACHE TABLE ... AS SELECT command.
*/
case class CacheTableAsSelect(
- tempViewName: String,
+ tempViewName: Expression,
plan: LogicalPlan,
originalText: String,
isLazy: Boolean,
@@ -1963,6 +1976,19 @@ case class CacheTableAsSelect(
isAnalyzed: Boolean = false,
referredTempFunctions: Seq[String] = Seq.empty)
extends AnalysisOnlyCommand with CTEInChildren {
+
+ /**
+ * Returns the temp view name string. Must only be called after analysis,
when `tempViewName`
+ * has been resolved to a non-null string `Literal`. `CheckAnalysis`
enforces this invariant.
+ */
+ def tempViewNameString: String = tempViewName match {
+ case Literal(value, _: StringType) if value != null => value.toString
+ case other =>
+ throw SparkException.internalError(
+ "CacheTableAsSelect.tempViewName must be a non-null string literal
after analysis, " +
+ s"but got: ${other.sql}")
+ }
+
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = {
assert(!isAnalyzed)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index e334f7e4865c..1db22037d31f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2763,7 +2763,7 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan("CACHE TABLE t AS SELECT * FROM testData"),
CacheTableAsSelect(
- "t",
+ Literal("t"),
Project(Seq(UnresolvedStar(None)),
UnresolvedRelation(Seq("testData"))),
"SELECT * FROM testData",
false,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 3c58298ec921..ed067a3f00d1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -778,7 +778,8 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
case r: CacheTableAsSelect =>
CacheTableAsSelectExec(
- r.tempViewName, r.plan, r.originalText, r.isLazy, r.options,
r.referredTempFunctions) :: Nil
+ r.tempViewNameString, r.plan, r.originalText, r.isLazy, r.options,
+ r.referredTempFunctions) :: Nil
case r: UncacheTable =>
def isTempView(table: LogicalPlan): Boolean = table match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
index d6b22431e854..575fcc058169 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
@@ -20,10 +20,12 @@ package org.apache.spark.sql
import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
+import org.apache.spark.sql.catalyst.analysis.{BindParameters,
CTESubstitution, ExpressionWithUnresolvedIdentifier, NameParameterizedQuery,
PlanWithUnresolvedIdentifier}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.Limit
+import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect,
CTEInChildren, Limit, OverwriteByExpression, ReplaceTableAsSelect, WithCTE}
import org.apache.spark.sql.catalyst.trees.SQLQueryContext
+import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.functions.{array, call_function, lit, map,
map_from_arrays, map_from_entries, str_to_map, struct}
import org.apache.spark.sql.internal.SQLConf
@@ -2460,4 +2462,206 @@ class ParametersSuite extends SharedSparkSession {
spark.sql("SELECT 1", Array.empty[Any]),
Row(1))
}
+
+ // SPARK-46625: WITH ... <write-with-IDENTIFIER> SELECT ... FROM cte
+ // The placeholder is pushed into the command's identifier slot at parse
time, so
+ // `CTESubstitution` sees the `CTEInChildren` directly and never produces
the invalid
+ // `WithCTE(InsertIntoStatement, ...)` / `WithCTE(CreateTableAsSelect, ...)`
shape.
+ private def assertNoWithCTEAroundCTEInChildren(df: DataFrame): Unit = {
+ df.queryExecution.analyzed.foreach {
+ case WithCTE(_: CTEInChildren, _) =>
+ fail(s"Found invalid WithCTE(CTEInChildren, _)
shape:\n${df.queryExecution.analyzed}")
+ case _ =>
+ }
+ }
+
+ test("SPARK-46625: WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ...
FROM cte") {
+ withTable("t_cte_overwrite") {
+ sql("CREATE TABLE t_cte_overwrite (a INT) USING PARQUET")
+ sql("INSERT INTO t_cte_overwrite VALUES (10)")
+ val df = spark.sql(
+ """WITH transformation AS (SELECT 1 AS a)
+ |INSERT OVERWRITE TABLE IDENTIFIER(:tname)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_cte_overwrite"))
+ assertNoWithCTEAroundCTEInChildren(df)
+ checkAnswer(spark.table("t_cte_overwrite"), Row(1))
+ }
+ }
+
+ test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte")
{
+ withTable("t_cte_into") {
+ sql("CREATE TABLE t_cte_into (a INT) USING PARQUET")
+ val df = spark.sql(
+ """WITH transformation AS (SELECT 7 AS a)
+ |INSERT INTO IDENTIFIER(:tname)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_cte_into"))
+ assertNoWithCTEAroundCTEInChildren(df)
+ checkAnswer(spark.table("t_cte_into"), Row(7))
+ }
+ }
+
+ test("SPARK-46625: CREATE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... FROM
cte") {
+ withTable("t_cte_ctas") {
+ val df = spark.sql(
+ """CREATE TABLE IDENTIFIER(:tname) USING PARQUET AS
+ |WITH transformation AS (SELECT 3 AS a)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_cte_ctas"))
+ assertNoWithCTEAroundCTEInChildren(df)
+ checkAnswer(spark.table("t_cte_ctas"), Row(3))
+ }
+ }
+
+ // SPARK-46625: legacy parameter-substitution mode triggers the
parameters.scala traversal
+ // path. The placeholder lives in `InsertIntoStatement.table`, which is
*not* a child, so this
+ // exercises the `InsertIntoStatement` special-case in `BindParameters.bind`
that recurses into
+ // the `table` slot, and the `getDefaultTreePatternBits` override on
`InsertIntoStatement` that
+ // exposes `table`'s tree-pattern bits for pruning.
+ test("SPARK-46625: INSERT IDENTIFIER(:p) under legacy parameter
substitution") {
+ withSQLConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY.key ->
"true") {
+ withTable("t_legacy_param") {
+ sql("CREATE TABLE t_legacy_param (a INT) USING PARQUET")
+ spark.sql(
+ """WITH transformation AS (SELECT 11 AS a)
+ |INSERT INTO IDENTIFIER(:tname)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_legacy_param"))
+ checkAnswer(spark.table("t_legacy_param"), Row(11))
+ }
+ }
+ }
+
+ // SPARK-46625: INSERT INTO REPLACE WHERE goes through
`OverwriteByExpression`, whose `table`
+ // slot is typed `NamedRelation`. `PlanWithUnresolvedIdentifier` extends
`NamedRelation` so the
+ // placeholder sits in the slot directly. Verify on the parsed plan that the
placeholder lives
+ // in `OverwriteByExpression.table` rather than wrapping the whole command
-- running the
+ // analyzer fully would require a v2 catalog.
+ test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ...
parser") {
+ // Use a non-literal-string expression so `withIdentClause` produces
+ // `PlanWithUnresolvedIdentifier` rather than short-circuiting to
`UnresolvedRelation`.
+ val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+ """WITH transformation AS (SELECT 99 AS a)
+ |INSERT INTO IDENTIFIER('some' || '_table') REPLACE WHERE a = 10
+ |SELECT * FROM transformation""".stripMargin)
+ val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression =>
o }.getOrElse(
+ fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan"))
+ assert(overwrite.table.isInstanceOf[PlanWithUnresolvedIdentifier],
+ s"Expected OverwriteByExpression.table to be
PlanWithUnresolvedIdentifier, " +
+ s"got ${overwrite.table.getClass.getSimpleName}:\n$parsedPlan")
+ // After CTESubstitution runs, the CTE defs should land on the command's
children (because
+ // OverwriteByExpression is a CTEInChildren) -- never as
`WithCTE(OverwriteByExpression, _)`.
+ val substituted = CTESubstitution.apply(parsedPlan)
+ substituted.foreach {
+ case WithCTE(_: CTEInChildren, _) =>
+ fail(s"Found invalid WithCTE(CTEInChildren, _) shape after
CTESubstitution:\n$substituted")
+ case _ =>
+ }
+ }
+
+ // SPARK-46625: Parameter inside `IDENTIFIER(:p)` on REPLACE WHERE lives in
+ // `OverwriteByExpression.table`, which is a non-child slot. Verify that
+ // `BindParameters.bind` reaches into the slot via the explicit
`OverwriteByExpression`
+ // recursion (parameters.scala) and that the `getDefaultTreePatternBits`
override on
+ // `OverwriteByExpression` exposes the PARAMETER bit for pruning. Done at
the rule level
+ // because driving REPLACE WHERE through full analysis would require a v2
catalog.
+ test("SPARK-46625: BindParameters recurses into
OverwriteByExpression.table") {
+ val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+ """INSERT INTO IDENTIFIER(:tname) REPLACE WHERE a = 10
+ |SELECT 1 AS a""".stripMargin)
+ val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression =>
o }.getOrElse(
+ fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan"))
+ // Pruning prerequisite: the PARAMETER bit must be visible at the
OverwriteByExpression
+ // level (it lives inside `table`, which is not a child); this exercises
the
+ // `getDefaultTreePatternBits` override.
+ assert(overwrite.containsPattern(PARAMETER),
+ "OverwriteByExpression.getDefaultTreePatternBits must propagate
`table`'s PARAMETER bit")
+
+ val bound = BindParameters.apply(
+ NameParameterizedQuery(parsedPlan, Seq("tname"),
Seq(Literal("foo_table"))))
+ val boundOverwrite = bound.collectFirst { case o: OverwriteByExpression =>
o }.getOrElse(
+ fail(s"Expected OverwriteByExpression in bound plan:\n$bound"))
+ assert(!boundOverwrite.table.containsPattern(PARAMETER),
+ s"Expected :tname inside OverwriteByExpression.table to be bound,
got:\n$boundOverwrite")
+ }
+
+ // SPARK-46625: `CacheTableAsSelect.tempViewName` is an `Expression` slot,
so an
+ // `IDENTIFIER(<non-literal>)` produces an
`ExpressionWithUnresolvedIdentifier` there instead of
+ // wrapping the entire command in a `PlanWithUnresolvedIdentifier`. Verify
on the parsed plan
+ // that the name slot holds the expression placeholder and no
`WithCTE(CTEInChildren, _)` shape
+ // survives `CTESubstitution` (running the cache through full analysis would
require the temp
+ // view machinery, so this is a parser-level test).
+ test("SPARK-46625: CACHE TABLE IDENTIFIER(...) AS WITH ... SELECT ...
parser") {
+ val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+ """CACHE TABLE IDENTIFIER('some' || '_view') AS
+ |WITH transformation AS (SELECT 4 AS a)
+ |SELECT * FROM transformation""".stripMargin)
+ val ctas = parsedPlan.collectFirst { case c: CacheTableAsSelect => c
}.getOrElse(
+ fail(s"Expected CacheTableAsSelect in parsed plan:\n$parsedPlan"))
+ assert(ctas.tempViewName.isInstanceOf[ExpressionWithUnresolvedIdentifier],
+ s"Expected CacheTableAsSelect.tempViewName to be
ExpressionWithUnresolvedIdentifier, " +
+ s"got ${ctas.tempViewName.getClass.getSimpleName}:\n$parsedPlan")
+ val substituted = CTESubstitution.apply(parsedPlan)
+ substituted.foreach {
+ case WithCTE(_: CTEInChildren, _) =>
+ fail(s"Found invalid WithCTE(CTEInChildren, _) shape after
CTESubstitution:\n$substituted")
+ case _ =>
+ }
+ }
+
+ // SPARK-46625: Regression for the `if c.tempViewName.resolved` guard in
CheckAnalysis. When
+ // the IDENTIFIER expression itself fails to resolve (e.g. references an
unresolved column),
+ // the guard skips the invariant-validation case so the catch-all
`LogicalPlan` case can
+ // produce `UNRESOLVED_COLUMN`. Without the guard, the invariant case would
pre-empt this
+ // path and throw a `SparkException internal error` instead.
+ test("SPARK-46625: CACHE TABLE IDENTIFIER(<unresolved-col>) reports
UNRESOLVED_COLUMN") {
+ val ex = intercept[AnalysisException] {
+ spark.sql("CACHE TABLE IDENTIFIER(unresolved_col) AS SELECT 1 AS a")
+ }
+ assert(ex.getCondition != null &&
ex.getCondition.startsWith("UNRESOLVED_COLUMN"),
+ s"Expected UNRESOLVED_COLUMN.*, got ${ex.getCondition}:
${ex.getMessage}")
+ assert(!ex.getMessage.contains("CacheTableAsSelect.tempViewName must be"),
+ s"Internal-error message leaked into user-facing error:
${ex.getMessage}")
+ }
+
+ // SPARK-46625: End-to-end CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ...
-- exercises the
+ // `tempViewNameString` extraction in `DataSourceV2Strategy` and the
`CheckAnalysis` invariant
+ // case for `CacheTableAsSelect.tempViewName`. The parser-level test above
already verifies
+ // the placement and CTE shape; this one drives the full analysis +
execution path.
+ test("SPARK-46625: CACHE TABLE IDENTIFIER(:p) AS WITH ... SELECT ...") {
+ withTempView("t_cte_cache") {
+ val df = spark.sql(
+ """CACHE TABLE IDENTIFIER(:tname) AS
+ |WITH transformation AS (SELECT 21 AS a)
+ |SELECT * FROM transformation""".stripMargin,
+ Map("tname" -> "t_cte_cache"))
+ assertNoWithCTEAroundCTEInChildren(df)
+ checkAnswer(spark.table("t_cte_cache"), Row(21))
+ }
+ }
+
+ // SPARK-46625: RTAS mirrors CTAS -- the placeholder goes into
`ReplaceTableAsSelect.name`
+ // at parse time. Verify on the parsed plan that the placeholder lives in
that slot and that
+ // no `WithCTE(CTEInChildren, _)` shape survives `CTESubstitution`. Running
RTAS through full
+ // analysis would require a v2 catalog, so this is a parser-level test.
+ test("SPARK-46625: REPLACE TABLE IDENTIFIER(...) AS WITH ... SELECT ...
parser") {
+ // Use a non-literal-string expression so `withIdentClause` produces
+ // `PlanWithUnresolvedIdentifier` rather than short-circuiting to
`UnresolvedIdentifier`.
+ val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+ """REPLACE TABLE IDENTIFIER('some' || '_table') USING PARQUET AS
+ |WITH transformation AS (SELECT 5 AS a)
+ |SELECT * FROM transformation""".stripMargin)
+ val rtas = parsedPlan.collectFirst { case r: ReplaceTableAsSelect => r
}.getOrElse(
+ fail(s"Expected ReplaceTableAsSelect in parsed plan:\n$parsedPlan"))
+ assert(rtas.name.isInstanceOf[PlanWithUnresolvedIdentifier],
+ s"Expected ReplaceTableAsSelect.name to be PlanWithUnresolvedIdentifier,
" +
+ s"got ${rtas.name.getClass.getSimpleName}:\n$parsedPlan")
+ val substituted = CTESubstitution.apply(parsedPlan)
+ substituted.foreach {
+ case WithCTE(_: CTEInChildren, _) =>
+ fail(s"Found invalid WithCTE(CTEInChildren, _) shape after
CTESubstitution:\n$substituted")
+ case _ =>
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]