This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 407e79c0ac52 [SPARK-53924] Reload DSv2 tables in views created using 
plans on each view access
407e79c0ac52 is described below

commit 407e79c0ac52cb39561b9cb7af742e09b9f6c645
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sat Nov 15 21:58:40 2025 -0800

    [SPARK-53924] Reload DSv2 tables in views created using plans on each view 
access
    
    ### What changes were proposed in this pull request?
    
    This PR makes Spark reload DSv2 tables in views created using plans on each 
view access.
    
    ### Why are the changes needed?
    
    The current problem is that the view definition in the session catalog 
captures the analyzed plan that references `Table` (that is supposed to pin the 
version). If a connector doesn’t have an internal cache and produces a new 
`Table` object on each load, the table referenced in the view will become 
orphan and there will be no way to refresh it unless that `Table` instance auto 
refreshes on each scan (super dangerous).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but it restores the correct behavior without requiring hacks in 
connectors.
    
    ### How was this patch tested?
    
    This PR comes with tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52876 from aokolnychyi/spark-53924.
    
    Authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   9 +
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  24 ++-
 .../sql/catalyst/analysis/RelationResolution.scala |  39 ++++
 .../sql/catalyst/analysis/V2TableReference.scala   | 139 ++++++++++++
 .../spark/sql/connector/catalog/V2TableUtil.scala  |   2 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  26 +++
 .../apache/spark/sql/execution/CacheManager.scala  |   4 +
 .../apache/spark/sql/execution/command/views.scala |  14 +-
 .../org/apache/spark/sql/CachedTableSuite.scala    |  73 +++++++
 .../sql/connector/DataSourceV2DataFrameSuite.scala | 240 +++++++++++++++++++++
 10 files changed, 564 insertions(+), 6 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 326c0bf843e8..75ec6f9aec99 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2128,6 +2128,15 @@
     ],
     "sqlState" : "42000"
   },
+  "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION" : {
+    "message" : [
+      "View <viewName> plan references table <tableName> whose <colType> 
columns changed since the view plan was initially captured.",
+      "Column changes:",
+      "<errors>",
+      "This indicates the table has evolved and the view based on the plan 
must be recreated."
+    ],
+    "sqlState" : "51024"
+  },
   "INCOMPATIBLE_COLUMN_TYPE" : {
     "message" : [
       "<operator> can only be performed on tables with compatible column 
types. The <columnOrdinalNumber> column of the <tableOrdinalNumber> table is 
<dataType1> type which is not compatible with <dataType2> at the same column of 
the first table.<hint>."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f9e14cb0daf8..6b0665c1b7f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1185,6 +1185,18 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       }
     }
 
+    private def resolveAsV2Relation(plan: LogicalPlan): 
Option[DataSourceV2Relation] = {
+      plan match {
+        case ref: V2TableReference =>
+          EliminateSubqueryAliases(relationResolution.resolveReference(ref)) 
match {
+            case r: DataSourceV2Relation => Some(r)
+            case _ => None
+          }
+        case r: DataSourceV2Relation => Some(r)
+        case _ => None
+      }
+    }
+
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, 
ruleId) {
       case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
@@ -1210,13 +1222,14 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
             resolveRelation(u).map(unwrapRelationPlan).map {
               case v: View => throw 
QueryCompilationErrors.writeIntoViewNotAllowedError(
                 v.desc.identifier, write)
-              case r: DataSourceV2Relation => write.withNewTable(r)
               case u: UnresolvedCatalogRelation =>
                 throw QueryCompilationErrors.writeIntoV1TableNotAllowedError(
                   u.tableMeta.identifier, write)
-              case other =>
-                throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(
-                  u.multipartIdentifier.quoted)
+              case plan =>
+                resolveAsV2Relation(plan).map(write.withNewTable).getOrElse {
+                  throw 
QueryCompilationErrors.writeIntoTempViewNotAllowedError(
+                    u.multipartIdentifier.quoted)
+                }
             }.getOrElse(write)
           case _ => write
         }
@@ -1224,6 +1237,9 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       case u: UnresolvedRelation =>
         resolveRelation(u).map(resolveViews(_, u.options)).getOrElse(u)
 
+      case r: V2TableReference =>
+        relationResolution.resolveReference(r)
+
       case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
           if timestamp.forall(ts => ts.resolved && 
!SubqueryExpression.hasSubquery(ts)) =>
         val timeTravelSpec = TimeTravelSpec.create(timestamp, version, 
conf.sessionLocalTimeZone)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
index 6ff40da88ed1..c7b92bc2a9fe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
@@ -225,6 +225,45 @@ class RelationResolution(override val catalogManager: 
CatalogManager)
     }
   }
 
+  def resolveReference(ref: V2TableReference): LogicalPlan = {
+    val relation = getOrLoadRelation(ref)
+    val planId = ref.getTagValue(LogicalPlan.PLAN_ID_TAG)
+    cloneWithPlanId(relation, planId)
+  }
+
+  private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = {
+    val key = toCacheKey(ref.catalog, ref.identifier)
+    relationCache.get(key) match {
+      case Some(cached) =>
+        adaptCachedRelation(cached, ref)
+      case None =>
+        val relation = loadRelation(ref)
+        relationCache.update(key, relation)
+        relation
+    }
+  }
+
+  private def loadRelation(ref: V2TableReference): LogicalPlan = {
+    val table = ref.catalog.loadTable(ref.identifier)
+    V2TableReferenceUtils.validateLoadedTable(table, ref)
+    val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
+    SubqueryAlias(tableName, ref.toRelation(table))
+  }
+
+  private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): 
LogicalPlan = {
+    cached transform {
+      case r: DataSourceV2Relation if matchesReference(r, ref) =>
+        V2TableReferenceUtils.validateLoadedTable(r.table, ref)
+        r.copy(output = ref.output, options = ref.options)
+    }
+  }
+
+  private def matchesReference(
+      relation: DataSourceV2Relation,
+      ref: V2TableReference): Boolean = {
+    relation.catalog.contains(ref.catalog) && 
relation.identifier.contains(ref.identifier)
+  }
+
   private def isResolvingView: Boolean = 
AnalysisContext.get.catalogAndNamespace.nonEmpty
 
   private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
new file mode 100644
index 000000000000..b6a2c6db6604
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.analysis.V2TableReference.Context
+import org.apache.spark.sql.catalyst.analysis.V2TableReference.TableInfo
+import 
org.apache.spark.sql.catalyst.analysis.V2TableReference.TemporaryViewContext
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+import org.apache.spark.sql.connector.catalog.Column
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.MetadataColumn
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.connector.catalog.V2TableUtil
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * A reference to a V2 table.
+ *
+ * References are placeholders for the latest table metadata and are replaced 
with actual table
+ * versions during analysis, allowing Spark to reload tables with up-to-date 
metadata. The newly
+ * loaded table metadata is validated against the original metadata depending 
on the context.
+ * For instance, temporary views with fully resolved logical plans don't allow 
schema changes
+ * in underlying tables.
+ */
+private[sql] case class V2TableReference private(
+    catalog: TableCatalog,
+    identifier: Identifier,
+    options: CaseInsensitiveStringMap,
+    info: TableInfo,
+    output: Seq[AttributeReference],
+    context: Context)
+  extends LeafNode with MultiInstanceRelation with NamedRelation {
+
+  override def name: String = V2TableUtil.toQualifiedName(catalog, identifier)
+
+  override def newInstance(): V2TableReference = {
+    copy(output = output.map(_.newInstance()))
+  }
+
+  override def computeStats(): Statistics = Statistics.DUMMY
+
+  override def simpleString(maxFields: Int): String = {
+    val outputString = truncatedString(output, "[", ", ", "]", maxFields)
+    s"TableReference$outputString $name"
+  }
+
+  def toRelation(table: Table): DataSourceV2Relation = {
+    DataSourceV2Relation(table, output, Some(catalog), Some(identifier), 
options)
+  }
+}
+
+private[sql] object V2TableReference {
+
+  case class TableInfo(
+      columns: Seq[Column],
+      metadataColumns: Seq[MetadataColumn])
+
+  sealed trait Context
+  case class TemporaryViewContext(viewName: Seq[String]) extends Context
+
+  def createForTempView(relation: DataSourceV2Relation, viewName: 
Seq[String]): V2TableReference = {
+    create(relation, TemporaryViewContext(viewName))
+  }
+
+  private def create(relation: DataSourceV2Relation, context: Context): 
V2TableReference = {
+    val ref = V2TableReference(
+      relation.catalog.get.asTableCatalog,
+      relation.identifier.get,
+      relation.options,
+      TableInfo(
+        columns = relation.table.columns.toImmutableArraySeq,
+        metadataColumns = V2TableUtil.extractMetadataColumns(relation)),
+      relation.output,
+      context)
+    ref.copyTagsFrom(relation)
+    ref
+  }
+}
+
+private[sql] object V2TableReferenceUtils extends SQLConfHelper {
+
+  def validateLoadedTable(table: Table, ref: V2TableReference): Unit = {
+    ref.context match {
+      case ctx: TemporaryViewContext =>
+        validateLoadedTableInTempView(table, ref, ctx)
+      case ctx =>
+        throw SparkException.internalError(s"Unknown table ref context: 
${ctx.getClass.getName}")
+    }
+  }
+
+  private def validateLoadedTableInTempView(
+      table: Table,
+      ref: V2TableReference,
+      ctx: TemporaryViewContext): Unit = {
+    val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
+
+    val dataErrors = V2TableUtil.validateCapturedColumns(table, 
ref.info.columns)
+    if (dataErrors.nonEmpty) {
+      throw QueryCompilationErrors.columnsChangedAfterViewWithPlanCreation(
+        ctx.viewName,
+        tableName,
+        dataErrors)
+    }
+
+    val metaErrors = V2TableUtil.validateCapturedMetadataColumns(table, 
ref.info.metadataColumns)
+    if (metaErrors.nonEmpty) {
+      throw 
QueryCompilationErrors.metadataColumnsChangedAfterViewWithPlanCreation(
+        ctx.viewName,
+        tableName,
+        metaErrors)
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
index 9873fc0881a0..fa8ed1b21a31 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala
@@ -97,7 +97,7 @@ private[sql] object V2TableUtil extends SQLConfHelper {
   }
 
   // extracts original column info for all metadata attributes in relation
-  private def extractMetadataColumns(relation: DataSourceV2Relation): 
Seq[MetadataColumn] = {
+  def extractMetadataColumns(relation: DataSourceV2Relation): 
Seq[MetadataColumn] = {
     val metaAttrs = relation.output.filter(_.isMetadataCol)
     if (metaAttrs.nonEmpty) {
       val metaCols = metadataColumns(relation.table)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 29b87e2d0096..f741c5c3975e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -4466,4 +4466,30 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       )
     )
   }
+
+  def columnsChangedAfterViewWithPlanCreation(
+      viewName: Seq[String],
+      tableName: Seq[String],
+      errors: Seq[String]): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+      messageParameters = Map(
+        "viewName" -> toSQLId(viewName),
+        "tableName" -> toSQLId(tableName),
+        "colType" -> "data",
+        "errors" -> errors.mkString("\n- ", "\n- ", "")))
+  }
+
+  def metadataColumnsChangedAfterViewWithPlanCreation(
+      viewName: Seq[String],
+      tableName: Seq[String],
+      errors: Seq[String]): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+      messageParameters = Map(
+        "viewName" -> toSQLId(viewName),
+        "tableName" -> toSQLId(tableName),
+        "colType" -> "metadata",
+        "errors" -> errors.mkString("\n- ", "\n- ", "")))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 671fcb765648..3944cf818895 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.internal.{Logging, MessageWithContext}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.analysis.V2TableReference
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
@@ -250,6 +251,9 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
         val nameInCache = v2Ident.toQualifiedNameParts(catalog)
         isSameName(nameInCache) && (includeTimeTravel || 
timeTravelSpec.isEmpty)
 
+      case r: V2TableReference =>
+        isSameName(r.identifier.toQualifiedNameParts(r.catalog))
+
       case v: View =>
         isSameName(v.desc.identifier.nameParts)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 514b64f6abed..11ec17ca57fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CapturesConfig, SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, 
GlobalTempView, LocalTempView, SchemaEvolution, SchemaUnsupported, 
ViewSchemaMode, ViewType}
+import org.apache.spark.sql.catalyst.analysis.V2TableReference
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, TemporaryViewRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
SubqueryExpression, VariableReference}
 import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, 
CreateTempView, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, 
WithCTE}
@@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.classic.ClassicConversions.castToImpl
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.types.{MetadataBuilder, StructType}
 import org.apache.spark.sql.util.SchemaUtils
@@ -733,7 +735,17 @@ object ViewHelper extends SQLConfHelper with Logging with 
CapturesConfig {
     } else {
       TemporaryViewRelation(
         prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan, 
defaultCollation),
-        Some(aliasedPlan))
+        Some(prepareTemporaryViewPlan(name, aliasedPlan)))
+    }
+  }
+
+  private def prepareTemporaryViewPlan(
+      viewName: TableIdentifier,
+      plan: LogicalPlan): LogicalPlan = {
+    plan transform {
+      case r: DataSourceV2Relation
+          if r.catalog.isDefined && r.identifier.isDefined && 
r.timeTravelSpec.isEmpty =>
+        V2TableReference.createForTempView(r, viewName.nameParts)
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 191587888ab8..91addd72ab2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -2457,6 +2457,79 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
     }
   }
 
+  test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp view 
(plan)") {
+    checkInsertInvalidatesCacheOfSQLTempView(storePlan = true)
+  }
+
+  test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp view 
(text)") {
+    checkInsertInvalidatesCacheOfSQLTempView(storePlan = false)
+  }
+
+  private def checkInsertInvalidatesCacheOfSQLTempView(storePlan: Boolean): 
Unit = {
+    val t = "testcat.tbl"
+    withTable(t, "v") {
+      withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> 
storePlan.toString) {
+        sql(s"CREATE TABLE $t (id int, data string) USING foo")
+        sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+        // create and cache SQL temp view
+        sql(s"CREATE TEMPORARY VIEW v AS SELECT id FROM $t")
+        sql("SELECT * FROM v").cache()
+
+        // verify view is cached
+        assertCached(sql("SELECT * FROM v"))
+        checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
+
+        // insert data into base table
+        sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
+
+        // verify cache was refreshed and will pick up new data
+        checkCacheLoading(sql(s"SELECT * FROM v"), isLoaded = false)
+
+        // verify view is recached correctly
+        assertCached(sql("SELECT * FROM v"))
+        checkAnswer(
+          sql("SELECT * FROM v"),
+          Seq(Row(1), Row(2), Row(3), Row(4)))
+      }
+    }
+  }
+
+  test("SPARK-53924: uncache DSv2 table uncaches SQL temp views (plan)") {
+    checkUncacheTableUncachesSQLTempView(storePlan = true)
+  }
+
+  test("SPARK-53924: uncache DSv2 table uncaches SQL temp views (text)") {
+    checkUncacheTableUncachesSQLTempView(storePlan = false)
+  }
+
+  private def checkUncacheTableUncachesSQLTempView(storePlan: Boolean): Unit = 
{
+    val t = "testcat.tbl"
+    withTable(t, "v") {
+      withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> 
storePlan.toString) {
+        sql(s"CREATE TABLE $t (id int, data string) USING foo")
+        sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
+
+        // cache table
+        sql(s"CACHE TABLE $t")
+        assertCached(sql(s"SELECT * FROM $t"))
+        checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, "a"), Row(2, "b")))
+
+        // create and cache SQL temp view
+        sql(s"CREATE TEMPORARY VIEW v AS SELECT id FROM $t")
+        sql("SELECT * FROM v").cache()
+        assertCached(sql("SELECT * FROM v"))
+        checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
+
+        // uncache table must invalidate view cache (cascading)
+        sql(s"UNCACHE TABLE $t")
+
+        // verify view is not cached anymore
+        assertNotCached(sql("SELECT * FROM v"))
+      }
+    }
+  }
+
   test("uncache persistent table via catalog API") {
     withTable("tbl1") {
       sql("CREATE TABLE tbl1 (name STRING, age INT) USING parquet")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 94fc250f4f83..c59e624cb178 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -1352,6 +1352,246 @@ class DataSourceV2DataFrameSuite
     }
   }
 
+  test("SPARK-53924: temp view on DSv2 table detects added columns") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+      // create temp view using DataFrame API
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // add column to underlying table
+      sql(s"ALTER TABLE $t ADD COLUMN age int")
+
+      // accessing temp view should detect schema change
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "\n- `age` INT has been added"))
+    }
+  }
+
+  test("SPARK-53924: temp view on DSv2 table detects removed columns") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, data string, age int) USING foo")
+
+      // create temp view
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // drop column from underlying table
+      sql(s"ALTER TABLE $t DROP COLUMN age")
+
+      // accessing temp view should detect schema change
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "\n- `age` INT has been removed"))
+    }
+  }
+
+  test("SPARK-53924: temp view on DSv2 table detects nullability changes") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, data string NOT NULL) USING foo")
+
+      // create temp view
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // change nullability constraint using ALTER TABLE
+      sql(s"ALTER TABLE $t ALTER COLUMN data DROP NOT NULL")
+
+      // accessing temp view should detect schema change
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "\n- `data` type has changed from STRING NOT NULL to 
STRING"))
+    }
+  }
+
+  test("SPARK-53924: temp view on DSv2 table accepts table ID changes") {
+    val t = "testcat.ns1.ns2.tbl"
+    val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+      val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+      df.write.insertInto(t)
+
+      // create temp view
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), df)
+
+      // capture the original table ID
+      val originalTableId = catalog("testcat").loadTable(ident).id
+
+      // drop and recreate table (this changes the table ID)
+      sql(s"DROP TABLE $t")
+      sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+      // verify table ID changed
+      val newTableId = catalog("testcat").loadTable(ident).id
+      assert(originalTableId != newTableId)
+
+      // accessing temp view should work despite table ID change (returns 
empty data)
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // insert new data and verify view reflects it
+      val newDF = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
+      newDF.write.insertInto(t)
+      checkAnswer(spark.table("v"), newDF)
+    }
+  }
+
+  test("SPARK-53924: createOrReplaceTempView works after schema change") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint) USING foo")
+
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // alter table
+      sql(s"ALTER TABLE $t ADD COLUMN data string")
+
+      // old view fails
+      intercept[AnalysisException] { spark.table("v").collect() }
+
+      // recreate view with updated schema
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // now it should work with new schema
+      val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+      df.write.insertInto(t)
+      checkAnswer(spark.table("v"), df)
+    }
+  }
+
+
+  test("SPARK-53924: temp view on DSv2 table with read options") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+      // create temp view with options
+      val df = spark.read.option("fakeOption", "testValue").table(t)
+      df.createOrReplaceTempView("v")
+
+      // verify options are preserved in the view
+      val options = spark.table("v").queryExecution.analyzed.collectFirst {
+        case d: DataSourceV2Relation => d.options
+      }.get
+      assert(options.get("fakeOption") == "testValue")
+
+      // schema changes should still be detected
+      sql(s"ALTER TABLE $t ADD COLUMN age int")
+
+      // accessing temp view should detect schema change
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "\n- `age` INT has been added"))
+    }
+  }
+
+  test("SPARK-53924: temp view on DSv2 table created using SQL with plan 
detects changes") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
+        sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+        // create temp view using SQL that should capture plan
+        sql(s"CREATE OR REPLACE TEMPORARY VIEW v AS SELECT * FROM $t")
+        checkAnswer(spark.table("v"), Seq.empty)
+
+        // verify that view stores analyzed plan
+        val Some(view) = spark.sessionState.catalog.getRawTempView("v")
+        assert(view.plan.isDefined)
+
+        // add column to underlying table
+        sql(s"ALTER TABLE $t ADD COLUMN age int")
+
+        // accessing temp view should detect schema change
+        checkError(
+          exception = intercept[AnalysisException] { 
spark.table("v").collect() },
+          condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+          parameters = Map(
+            "viewName" -> "`v`",
+            "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+            "colType" -> "data",
+            "errors" -> "\n- `age` INT has been added"))
+      }
+    }
+  }
+
+  test("SPARK-53924: temp view on DSv2 table detects VARCHAR/CHAR type 
changes") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, name VARCHAR(10)) USING foo")
+
+      // create temp view
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // change VARCHAR(10) to VARCHAR(20)
+      sql(s"ALTER TABLE $t ALTER COLUMN name TYPE VARCHAR(20)")
+
+      // accessing temp view should detect type change
+      checkError(
+        exception = intercept[AnalysisException] { spark.table("v").collect() 
},
+        condition = 
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+        parameters = Map(
+          "viewName" -> "`v`",
+          "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+          "colType" -> "data",
+          "errors" -> "\n- `name` type has changed from VARCHAR(10) to 
VARCHAR(20)"))
+    }
+  }
+
+  test("SPARK-53924: temp view on DSv2 table works after inserting data") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+
+      // create temp view
+      spark.table(t).createOrReplaceTempView("v")
+      checkAnswer(spark.table("v"), Seq.empty)
+
+      // insert data into underlying table (no schema change)
+      val df = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+      df.write.insertInto(t)
+
+      // accessing temp view should work and reflect new data
+      checkAnswer(spark.table("v"), df)
+
+      // insert more data
+      val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
+      df2.write.insertInto(t)
+
+      // view should reflect all data
+      checkAnswer(spark.table("v"), df.union(df2))
+    }
+  }
+
   private def pinTable(catalogName: String, ident: Identifier, version: 
String): Unit = {
     catalog(catalogName) match {
       case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident, 
version)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to