This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 407e79c0ac52 [SPARK-53924] Reload DSv2 tables in views created using
plans on each view access
407e79c0ac52 is described below
commit 407e79c0ac52cb39561b9cb7af742e09b9f6c645
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sat Nov 15 21:58:40 2025 -0800
[SPARK-53924] Reload DSv2 tables in views created using plans on each view
access
### What changes were proposed in this pull request?
This PR makes Spark reload DSv2 tables in views created using plans on each
view access.
### Why are the changes needed?
The current problem is that the view definition in the session catalog
captures the analyzed plan that references `Table` (that is supposed to pin the
version). If a connector doesn’t have an internal cache and produces a new
`Table` object on each load, the table referenced in the view will become
orphan and there will be no way to refresh it unless that `Table` instance auto
refreshes on each scan (super dangerous).
### Does this PR introduce _any_ user-facing change?
Yes, but it restores the correct behavior without requiring hacks in
connectors.
### How was this patch tested?
This PR comes with tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52876 from aokolnychyi/spark-53924.
Authored-by: Anton Okolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 9 +
.../spark/sql/catalyst/analysis/Analyzer.scala | 24 ++-
.../sql/catalyst/analysis/RelationResolution.scala | 39 ++++
.../sql/catalyst/analysis/V2TableReference.scala | 139 ++++++++++++
.../spark/sql/connector/catalog/V2TableUtil.scala | 2 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 26 +++
.../apache/spark/sql/execution/CacheManager.scala | 4 +
.../apache/spark/sql/execution/command/views.scala | 14 +-
.../org/apache/spark/sql/CachedTableSuite.scala | 73 +++++++
.../sql/connector/DataSourceV2DataFrameSuite.scala | 240 +++++++++++++++++++++
10 files changed, 564 insertions(+), 6 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 326c0bf843e8..75ec6f9aec99 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2128,6 +2128,15 @@
],
"sqlState" : "42000"
},
+ "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION" : {
+ "message" : [
+ "View <viewName> plan references table <tableName> whose <colType>
columns changed since the view plan was initially captured.",
+ "Column changes:",
+ "<errors>",
+ "This indicates the table has evolved and the view based on the plan
must be recreated."
+ ],
+ "sqlState" : "51024"
+ },
"INCOMPATIBLE_COLUMN_TYPE" : {
"message" : [
"<operator> can only be performed on tables with compatible column
types. The <columnOrdinalNumber> column of the <tableOrdinalNumber> table is
<dataType1> type which is not compatible with <dataType2> at the same column of
the first table.<hint>."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f9e14cb0daf8..6b0665c1b7f3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1185,6 +1185,18 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
}
}
+ private def resolveAsV2Relation(plan: LogicalPlan):
Option[DataSourceV2Relation] = {
+ plan match {
+ case ref: V2TableReference =>
+ EliminateSubqueryAliases(relationResolution.resolveReference(ref))
match {
+ case r: DataSourceV2Relation => Some(r)
+ case _ => None
+ }
+ case r: DataSourceV2Relation => Some(r)
+ case _ => None
+ }
+ }
+
def apply(plan: LogicalPlan)
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn,
ruleId) {
case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
@@ -1210,13 +1222,14 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
resolveRelation(u).map(unwrapRelationPlan).map {
case v: View => throw
QueryCompilationErrors.writeIntoViewNotAllowedError(
v.desc.identifier, write)
- case r: DataSourceV2Relation => write.withNewTable(r)
case u: UnresolvedCatalogRelation =>
throw QueryCompilationErrors.writeIntoV1TableNotAllowedError(
u.tableMeta.identifier, write)
- case other =>
- throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(
- u.multipartIdentifier.quoted)
+ case plan =>
+ resolveAsV2Relation(plan).map(write.withNewTable).getOrElse {
+ throw
QueryCompilationErrors.writeIntoTempViewNotAllowedError(
+ u.multipartIdentifier.quoted)
+ }
}.getOrElse(write)
case _ => write
}
@@ -1224,6 +1237,9 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
case u: UnresolvedRelation =>
resolveRelation(u).map(resolveViews(_, u.options)).getOrElse(u)
+ case r: V2TableReference =>
+ relationResolution.resolveReference(r)
+
case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
if timestamp.forall(ts => ts.resolved &&
!SubqueryExpression.hasSubquery(ts)) =>
val timeTravelSpec = TimeTravelSpec.create(timestamp, version,
conf.sessionLocalTimeZone)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
index 6ff40da88ed1..c7b92bc2a9fe 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
@@ -225,6 +225,45 @@ class RelationResolution(override val catalogManager:
CatalogManager)
}
}
+ def resolveReference(ref: V2TableReference): LogicalPlan = {
+ val relation = getOrLoadRelation(ref)
+ val planId = ref.getTagValue(LogicalPlan.PLAN_ID_TAG)
+ cloneWithPlanId(relation, planId)
+ }
+
+ private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = {
+ val key = toCacheKey(ref.catalog, ref.identifier)
+ relationCache.get(key) match {
+ case Some(cached) =>
+ adaptCachedRelation(cached, ref)
+ case None =>
+ val relation = loadRelation(ref)
+ relationCache.update(key, relation)
+ relation
+ }
+ }
+
+ private def loadRelation(ref: V2TableReference): LogicalPlan = {
+ val table = ref.catalog.loadTable(ref.identifier)
+ V2TableReferenceUtils.validateLoadedTable(table, ref)
+ val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
+ SubqueryAlias(tableName, ref.toRelation(table))
+ }
+
+ private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference):
LogicalPlan = {
+ cached transform {
+ case r: DataSourceV2Relation if matchesReference(r, ref) =>
+ V2TableReferenceUtils.validateLoadedTable(r.table, ref)
+ r.copy(output = ref.output, options = ref.options)
+ }
+ }
+
+ private def matchesReference(
+ relation: DataSourceV2Relation,
+ ref: V2TableReference): Boolean = {
+ relation.catalog.contains(ref.catalog) &&
relation.identifier.contains(ref.identifier)
+ }
+
private def isResolvingView: Boolean =
AnalysisContext.get.catalogAndNamespace.nonEmpty
private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
new file mode 100644
index 000000000000..b6a2c6db6604
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.analysis.V2TableReference.Context
+import org.apache.spark.sql.catalyst.analysis.V2TableReference.TableInfo
+import
org.apache.spark.sql.catalyst.analysis.V2TableReference.TemporaryViewContext
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.connector.catalog.Column
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.MetadataColumn
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.connector.catalog.V2TableUtil
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * A reference to a V2 table.
+ *
+ * References are placeholders for the latest table metadata and are replaced
with actual table
+ * versions during analysis, allowing Spark to reload tables with up-to-date
metadata. The newly
+ * loaded table metadata is validated against the original metadata depending
on the context.
+ * For instance, temporary views with fully resolved logical plans don't allow
schema changes
+ * in underlying tables.
+ */
+private[sql] case class V2TableReference private(
+ catalog: TableCatalog,
+ identifier: Identifier,
+ options: CaseInsensitiveStringMap,
+ info: TableInfo,
+ output: Seq[AttributeReference],
+ context: Context)
+ extends LeafNode with MultiInstanceRelation with NamedRelation {
+
+ override def name: String = V2TableUtil.toQualifiedName(catalog, identifier)
+
+ override def newInstance(): V2TableReference = {
+ copy(output = output.map(_.newInstance()))
+ }
+
+ override def computeStats(): Statistics = Statistics.DUMMY
+
+ override def simpleString(maxFields: Int): String = {
+ val outputString = truncatedString(output, "[", ", ", "]", maxFields)
+ s"TableReference$outputString $name"
+ }
+
+ def toRelation(table: Table): DataSourceV2Relation = {
+ DataSourceV2Relation(table, output, Some(catalog), Some(identifier),
options)
+ }
+}
+
+private[sql] object V2TableReference {
+
+ case class TableInfo(
+ columns: Seq[Column],
+ metadataColumns: Seq[MetadataColumn])
+
+ sealed trait Context
+ case class TemporaryViewContext(viewName: Seq[String]) extends Context
+
+ def createForTempView(relation: DataSourceV2Relation, viewName:
Seq[String]): V2TableReference = {
+ create(relation, TemporaryViewContext(viewName))
+ }
+
+ private def create(relation: DataSourceV2Relation, context: Context):
V2TableReference = {
+ val ref = V2TableReference(
+ relation.catalog.get.asTableCatalog,
+ relation.identifier.get,
+ relation.options,
+ TableInfo(
+ columns = relation.table.columns.toImmutableArraySeq,
+ metadataColumns = V2TableUtil.extractMetadataColumns(relation)),
+ relation.output,
+ context)
+ ref.copyTagsFrom(relation)
+ ref
+ }
+}
+
+private[sql] object V2TableReferenceUtils extends SQLConfHelper {
+
+ def validateLoadedTable(table: Table, ref: V2TableReference): Unit = {
+ ref.context match {
+ case ctx: TemporaryViewContext =>
+ validateLoadedTableInTempView(table, ref, ctx)
+ case ctx =>
+ throw SparkException.internalError(s"Unknown table ref context:
${ctx.getClass.getName}")
+ }
+ }
+
+ private def validateLoadedTableInTempView(
+ table: Table,
+ ref: V2TableReference,
+ ctx: TemporaryViewContext): Unit = {
+ val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
+
+ val dataErrors = V2TableUtil.validateCapturedColumns(table,
ref.info.columns)
+ if (dataErrors.nonEmpty) {
+ throw QueryCompilationErrors.columnsChangedAfterViewWithPlanCreation(
+ ctx.viewName,
+ tableName,
+ dataErrors)
+ }
+
+ val metaErrors = V2TableUtil.validateCapturedMetadataColumns(table,
ref.info.metadataColumns)
+ if (metaErrors.nonEmpty) {
+ throw
QueryCompilationErrors.metadataColumnsChangedAfterViewWithPlanCreation(
+ ctx.viewName,
+ tableName,
+ metaErrors)
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
index 9873fc0881a0..fa8ed1b21a31 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
@@ -97,7 +97,7 @@ private[sql] object V2TableUtil extends SQLConfHelper {
}
// extracts original column info for all metadata attributes in relation
- private def extractMetadataColumns(relation: DataSourceV2Relation):
Seq[MetadataColumn] = {
+ def extractMetadataColumns(relation: DataSourceV2Relation):
Seq[MetadataColumn] = {
val metaAttrs = relation.output.filter(_.isMetadataCol)
if (metaAttrs.nonEmpty) {
val metaCols = metadataColumns(relation.table)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 29b87e2d0096..f741c5c3975e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -4466,4 +4466,30 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
)
)
}
+
+ def columnsChangedAfterViewWithPlanCreation(
+ viewName: Seq[String],
+ tableName: Seq[String],
+ errors: Seq[String]): Throwable = {
+ new AnalysisException(
+ errorClass = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ messageParameters = Map(
+ "viewName" -> toSQLId(viewName),
+ "tableName" -> toSQLId(tableName),
+ "colType" -> "data",
+ "errors" -> errors.mkString("\n- ", "\n- ", "")))
+ }
+
+ def metadataColumnsChangedAfterViewWithPlanCreation(
+ viewName: Seq[String],
+ tableName: Seq[String],
+ errors: Seq[String]): Throwable = {
+ new AnalysisException(
+ errorClass = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ messageParameters = Map(
+ "viewName" -> toSQLId(viewName),
+ "tableName" -> toSQLId(tableName),
+ "colType" -> "metadata",
+ "errors" -> errors.mkString("\n- ", "\n- ", "")))
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 671fcb765648..3944cf818895 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.{Logging, MessageWithContext}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.analysis.V2TableReference
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute,
SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
@@ -250,6 +251,9 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
isSameName(nameInCache) && (includeTimeTravel ||
timeTravelSpec.isEmpty)
+ case r: V2TableReference =>
+ isSameName(r.identifier.toQualifiedNameParts(r.catalog))
+
case v: View =>
isSameName(v.desc.identifier.nameParts)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 514b64f6abed..11ec17ca57fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CapturesConfig, SQLConfHelper,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext,
GlobalTempView, LocalTempView, SchemaEvolution, SchemaUnsupported,
ViewSchemaMode, ViewType}
+import org.apache.spark.sql.catalyst.analysis.V2TableReference
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
SubqueryExpression, VariableReference}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand,
CreateTempView, CTEInChildren, CTERelationDef, LogicalPlan, Project, View,
WithCTE}
@@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
import org.apache.spark.sql.util.SchemaUtils
@@ -733,7 +735,17 @@ object ViewHelper extends SQLConfHelper with Logging with
CapturesConfig {
} else {
TemporaryViewRelation(
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan,
defaultCollation),
- Some(aliasedPlan))
+ Some(prepareTemporaryViewPlan(name, aliasedPlan)))
+ }
+ }
+
+ private def prepareTemporaryViewPlan(
+ viewName: TableIdentifier,
+ plan: LogicalPlan): LogicalPlan = {
+ plan transform {
+ case r: DataSourceV2Relation
+ if r.catalog.isDefined && r.identifier.isDefined &&
r.timeTravelSpec.isEmpty =>
+ V2TableReference.createForTempView(r, viewName.nameParts)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 191587888ab8..91addd72ab2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -2457,6 +2457,79 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
}
}
+ test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp view
(plan)") {
+ checkInsertInvalidatesCacheOfSQLTempView(storePlan = true)
+ }
+
+ test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp view
(text)") {
+ checkInsertInvalidatesCacheOfSQLTempView(storePlan = false)
+ }
+
+ private def checkInsertInvalidatesCacheOfSQLTempView(storePlan: Boolean):
Unit = {
+ val t = "testcat.tbl"
+ withTable(t, "v") {
+ withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key ->
storePlan.toString) {
+ sql(s"CREATE TABLE $t (id int, data string) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+ // create and cache SQL temp view
+ sql(s"CREATE TEMPORARY VIEW v AS SELECT id FROM $t")
+ sql("SELECT * FROM v").cache()
+
+ // verify view is cached
+ assertCached(sql("SELECT * FROM v"))
+ checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
+
+ // insert data into base table
+ sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
+
+ // verify cache was refreshed and will pick up new data
+ checkCacheLoading(sql(s"SELECT * FROM v"), isLoaded = false)
+
+ // verify view is recached correctly
+ assertCached(sql("SELECT * FROM v"))
+ checkAnswer(
+ sql("SELECT * FROM v"),
+ Seq(Row(1), Row(2), Row(3), Row(4)))
+ }
+ }
+ }
+
+ test("SPARK-53924: uncache DSv2 table uncaches SQL temp views (plan)") {
+ checkUncacheTableUncachesSQLTempView(storePlan = true)
+ }
+
+ test("SPARK-53924: uncache DSv2 table uncaches SQL temp views (text)") {
+ checkUncacheTableUncachesSQLTempView(storePlan = false)
+ }
+
+ private def checkUncacheTableUncachesSQLTempView(storePlan: Boolean): Unit =
{
+ val t = "testcat.tbl"
+ withTable(t, "v") {
+ withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key ->
storePlan.toString) {
+ sql(s"CREATE TABLE $t (id int, data string) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+ // cache table
+ sql(s"CACHE TABLE $t")
+ assertCached(sql(s"SELECT * FROM $t"))
+ checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, "a"), Row(2, "b")))
+
+ // create and cache SQL temp view
+ sql(s"CREATE TEMPORARY VIEW v AS SELECT id FROM $t")
+ sql("SELECT * FROM v").cache()
+ assertCached(sql("SELECT * FROM v"))
+ checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
+
+ // uncache table must invalidate view cache (cascading)
+ sql(s"UNCACHE TABLE $t")
+
+ // verify view is not cached anymore
+ assertNotCached(sql("SELECT * FROM v"))
+ }
+ }
+ }
+
test("uncache persistent table via catalog API") {
withTable("tbl1") {
sql("CREATE TABLE tbl1 (name STRING, age INT) USING parquet")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 94fc250f4f83..c59e624cb178 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -1352,6 +1352,246 @@ class DataSourceV2DataFrameSuite
}
}
+ test("SPARK-53924: temp view on DSv2 table detects added columns") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+ // create temp view using DataFrame API
+ spark.table(t).createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // add column to underlying table
+ sql(s"ALTER TABLE $t ADD COLUMN age int")
+
+ // accessing temp view should detect schema change
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "\n- `age` INT has been added"))
+ }
+ }
+
+ test("SPARK-53924: temp view on DSv2 table detects removed columns") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string, age int) USING foo")
+
+ // create temp view
+ spark.table(t).createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // drop column from underlying table
+ sql(s"ALTER TABLE $t DROP COLUMN age")
+
+ // accessing temp view should detect schema change
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "\n- `age` INT has been removed"))
+ }
+ }
+
+ test("SPARK-53924: temp view on DSv2 table detects nullability changes") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string NOT NULL) USING foo")
+
+ // create temp view
+ spark.table(t).createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // change nullability constraint using ALTER TABLE
+ sql(s"ALTER TABLE $t ALTER COLUMN data DROP NOT NULL")
+
+ // accessing temp view should detect schema change
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "\n- `data` type has changed from STRING NOT NULL to
STRING"))
+ }
+ }
+
+ test("SPARK-53924: temp view on DSv2 table accepts table ID changes") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+ val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df.write.insertInto(t)
+
+ // create temp view
+ spark.table(t).createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), df)
+
+ // capture the original table ID
+ val originalTableId = catalog("testcat").loadTable(ident).id
+
+ // drop and recreate table (this changes the table ID)
+ sql(s"DROP TABLE $t")
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+ // verify table ID changed
+ val newTableId = catalog("testcat").loadTable(ident).id
+ assert(originalTableId != newTableId)
+
+ // accessing temp view should work despite table ID change (returns
empty data)
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // insert new data and verify view reflects it
+ val newDF = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
+ newDF.write.insertInto(t)
+ checkAnswer(spark.table("v"), newDF)
+ }
+ }
+
+ test("SPARK-53924: createOrReplaceTempView works after schema change") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint) USING foo")
+
+ spark.table(t).createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // alter table
+ sql(s"ALTER TABLE $t ADD COLUMN data string")
+
+ // old view fails
+ intercept[AnalysisException] { spark.table("v").collect() }
+
+ // recreate view with updated schema
+ spark.table(t).createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // now it should work with new schema
+ val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df.write.insertInto(t)
+ checkAnswer(spark.table("v"), df)
+ }
+ }
+
+
+ test("SPARK-53924: temp view on DSv2 table with read options") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+ // create temp view with options
+ val df = spark.read.option("fakeOption", "testValue").table(t)
+ df.createOrReplaceTempView("v")
+
+ // verify options are preserved in the view
+ val options = spark.table("v").queryExecution.analyzed.collectFirst {
+ case d: DataSourceV2Relation => d.options
+ }.get
+ assert(options.get("fakeOption") == "testValue")
+
+ // schema changes should still be detected
+ sql(s"ALTER TABLE $t ADD COLUMN age int")
+
+ // accessing temp view should detect schema change
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "\n- `age` INT has been added"))
+ }
+ }
+
+ test("SPARK-53924: temp view on DSv2 table created using SQL with plan
detects changes") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+ // create temp view using SQL that should capture plan
+ sql(s"CREATE OR REPLACE TEMPORARY VIEW v AS SELECT * FROM $t")
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // verify that view stores analyzed plan
+ val Some(view) = spark.sessionState.catalog.getRawTempView("v")
+ assert(view.plan.isDefined)
+
+ // add column to underlying table
+ sql(s"ALTER TABLE $t ADD COLUMN age int")
+
+ // accessing temp view should detect schema change
+ checkError(
+ exception = intercept[AnalysisException] {
spark.table("v").collect() },
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "\n- `age` INT has been added"))
+ }
+ }
+ }
+
+ test("SPARK-53924: temp view on DSv2 table detects VARCHAR/CHAR type
changes") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, name VARCHAR(10)) USING foo")
+
+ // create temp view
+ spark.table(t).createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // change VARCHAR(10) to VARCHAR(20)
+ sql(s"ALTER TABLE $t ALTER COLUMN name TYPE VARCHAR(20)")
+
+ // accessing temp view should detect type change
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "\n- `name` type has changed from VARCHAR(10) to
VARCHAR(20)"))
+ }
+ }
+
+ test("SPARK-53924: temp view on DSv2 table works after inserting data") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+ // create temp view
+ spark.table(t).createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // insert data into underlying table (no schema change)
+ val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df.write.insertInto(t)
+
+ // accessing temp view should work and reflect new data
+ checkAnswer(spark.table("v"), df)
+
+ // insert more data
+ val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
+ df2.write.insertInto(t)
+
+ // view should reflect all data
+ checkAnswer(spark.table("v"), df.union(df2))
+ }
+ }
+
private def pinTable(catalogName: String, ident: Identifier, version:
String): Unit = {
catalog(catalogName) match {
case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident,
version)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]