This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 0fa4c0c69f81 [SPARK-56643][SQL][TESTS] Add DSv2 temp view with stored
plan tests
0fa4c0c69f81 is described below
commit 0fa4c0c69f817c9fdc22ca5e16fddcdb2668defa
Author: Thang Long Vu <[email protected]>
AuthorDate: Wed May 20 21:25:41 2026 +0800
[SPARK-56643][SQL][TESTS] Add DSv2 temp view with stored plan tests
### What changes were proposed in this pull request?
Add new tests to `DataSourceV2DataFrameSuite` that verify temporary view
behavior with stored plans when the underlying DSv2 table changes.
Each scenario tests three sub-cases:
- **Session write**: Table modification via SQL (same session).
- **External write**: Table modification via the catalog API (simulates
writes from outside the session).
- **External write with cache**: Same as external write, but with the temp
view cached first.
**Scenarios:**
1. **Insert new data** (1.1 session, 1.2 external, 1.3 external+cache):
Temp view with filter reflects new data.
2. **ADD COLUMN** (2.1 session, 2.2 external, 2.3 external+cache): Temp
view preserves original schema after ADD COLUMN.
3. **DROP COLUMN** (3.1 session, 3.2 external, 3.3 external+cache): Temp
view detects column removal.
4. **Drop and recreate table** (4.1 session, 4.2 external, 4.3
external+cache): Temp view resolves to recreated table.
5. **Drop and re-add column with same type** (5.1 session, 5.2 external,
5.3 external+cache): Schema validation passes, view continues working.
6. **Drop and re-add column with different type** (6.1 session, 6.2
external, 6.3 external+cache): Temp view detects type change.
7. **Type widening** (7.1 session, 7.2 external, 7.3 external+cache): Temp
view detects INT to BIGINT type change.
Each test creates a DSv2 table, inserts initial data, builds a temp view
with a filter (`salary < 999`) to demonstrate stored plan behavior, and
verifies the view after table modifications.
#### Shared test infrastructure changes
- **`CachingInMemoryTableCatalog`** (new): A per-instance caching test
catalog that wraps `InMemoryTableCatalog` with a `ConcurrentHashMap`-based
`loadTable` cache. Simulates Iceberg-style stale-cache behavior where the
catalog returns a cached `Table` object even after external writes modify the
underlying data.
- **`BasicInMemoryTableCatalog.alterTable`**: Fixed data-migration
semantics for combined DROP + ADD column changes. The intermediate schema
(after drops, before adds) is now passed to `alterTableWithData` so that old
column data is properly dropped rather than retained by name-matching.
### Why are the changes needed?
The existing SPARK-53924 tests in `DataSourceV2DataFrameSuite` cover basic
schema change detection for temp views on DSv2 tables but are missing:
- Filter-based stored plan tests
- External write variants (using catalog API to simulate writes from
outside the session)
- External write with cache variants
- Scenarios 5, 6, 7 (drop+re-add column same/different type, type widening)
### Does this PR introduce _any_ user-facing change?
No. This PR only adds tests and modifies test-only infrastructure.
### How was this patch tested?
New tests in `DataSourceV2DataFrameSuite`.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-6)
Closes #55540 from longvu-db/dsv2-classic-pr1-temp-views.
Authored-by: Thang Long Vu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e7c0a3490ca80b56bd4b4a1982954c48763d94ea)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalog/CachingInMemoryTableCatalog.scala | 50 ++
.../connector/catalog/InMemoryTableCatalog.scala | 21 +-
.../sql/connector/DataSourceV2DataFrameSuite.scala | 620 ++++++++++++++++++++-
3 files changed, 665 insertions(+), 26 deletions(-)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala
new file mode 100644
index 000000000000..f8e3224fa7e1
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * An InMemoryTableCatalog that simulates a caching connector like
+ * Iceberg's CachingCatalog. On first [[loadTable]], returns a fresh
+ * copy. On subsequent loads, returns the CACHED (stale) copy,
+ * making external changes invisible.
+ *
+ * Session writes go through the write-variant [[loadTable]], which is not
+ * cached, so they modify the underlying table directly. Cached [[loadTable]]
+ * results may still be stale until [[clearCache]] or REFRESH TABLE (which
+ * invokes [[invalidateTable]]) is called.
+ *
+ * Only the primary
[[loadTable(ident:org\.apache\.spark\.sql\.connector\.catalog\.Identifier)*]]
+ * overload is cached. Version and timestamp overloads bypass the cache,
matching
+ * time-travel semantics. [[dropTable]], [[createTable]], and [[alterTable]]
do not
+ * invalidate the cache, matching the behavior of real caching connectors.
+ */
+class CachingInMemoryTableCatalog extends InMemoryTableCatalog {
+ private val cachedTables = new ConcurrentHashMap[Identifier, Table]()
+
+ override def loadTable(ident: Identifier): Table =
+ cachedTables.computeIfAbsent(ident, _ => super.loadTable(ident))
+
+ override def invalidateTable(ident: Identifier): Unit = {
+ super.invalidateTable(ident)
+ cachedTables.remove(ident)
+ }
+
+ def clearCache(): Unit = cachedTables.clear()
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 1654e9e9a66d..d39e422a9d9e 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -179,6 +179,23 @@ class BasicInMemoryTableCatalog extends TableCatalog {
throw new IllegalArgumentException(s"Cannot drop all fields")
}
+ // Compute the intermediate schema that only reflects column deletions.
+ // [[InMemoryBaseTable.alterTableWithData]] decides which old-row fields
to keep by
+ // matching names against its newSchema argument. Passing this post-drop
schema
+ // (rather than the final schema that may re-add a same-named column)
ensures that
+ // dropped column values are physically removed from existing data.
+ // Note: this only handles top-level column deletions. Nested column
deletions
+ // would need additional handling, but [[alterTableWithData]] only filters
by
+ // top-level field name anyway.
+ val deletedTopLevelNames = changes.collect {
+ case d: TableChange.DeleteColumn if d.fieldNames.length == 1 =>
d.fieldNames.head
+ }.toSet
+ val schemaAfterDrops = if (deletedTopLevelNames.nonEmpty) {
+ StructType(table.schema.fields.filterNot(f =>
deletedTopLevelNames(f.name)))
+ } else {
+ schema
+ }
+
table.increaseVersion()
val currentVersion = table.version()
val columnsWithIds = InMemoryBaseTable.assignMissingIds(
@@ -193,14 +210,14 @@ class BasicInMemoryTableCatalog extends TableCatalog {
properties = properties,
constraints = constraints,
id = table.id)
- .alterTableWithData(table.data, schema)
+ .alterTableWithData(table.data, schemaAfterDrops)
case _: InMemoryTableWithV2Filter =>
new InMemoryTableWithV2Filter(
name = table.name,
columns = columnsWithIds,
partitioning = finalPartitioning,
properties = properties)
- .alterTableWithData(table.data, schema)
+ .alterTableWithData(table.data, schemaAfterDrops)
case other =>
throw new UnsupportedOperationException(
s"Unsupported InMemoryBaseTable subclass: ${other.getClass.getName}")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index c532ef359a7c..4fc93609fb41 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue,
ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryTableCatalog,
MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog,
NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo,
TypeChangeResetsColIdTableCatalog}
+import org.apache.spark.sql.connector.catalog.{BufferedRows,
CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue,
ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable,
InMemoryTableCatalog, MixedColumnIdTableCatalog,
NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog,
SupportsV1OverwriteWithSaveAsTable, TableInfo,
TypeChangeResetsColIdTableCatalog}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn,
UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableChange
@@ -54,6 +55,9 @@ class DataSourceV2DataFrameSuite
.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
.set("spark.sql.catalog.testcat.copyOnLoad", "true")
.set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName)
+ .set("spark.sql.catalog.cachingcat",
+ classOf[CachingInMemoryTableCatalog].getName)
+ .set("spark.sql.catalog.cachingcat.copyOnLoad", "true")
.set("spark.sql.catalog.nullidcat",
classOf[NullTableIdInMemoryTableCatalog].getName)
.set("spark.sql.catalog.nullidcat.copyOnLoad", "true")
@@ -71,6 +75,7 @@ class DataSourceV2DataFrameSuite
.set("spark.sql.catalog.composedidcat.copyOnLoad", "true")
after {
+
catalog("cachingcat").asInstanceOf[CachingInMemoryTableCatalog].clearCache()
spark.sessionState.catalogManager.reset()
}
@@ -2520,29 +2525,6 @@ class DataSourceV2DataFrameSuite
}
}
- test("temp view with stored plan after session drop and re-add column same
type") {
- val t = "testcat.ns1.ns2.tbl"
- withTable(t) {
- sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
- sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
-
- // create two temp views with salary filters
- spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
- spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_null")
- checkAnswer(spark.table("v"), Seq(Row(1, 100)))
- checkAnswer(spark.table("v_null"), Seq.empty)
-
- // drop and re-add column with same name and type
- sql(s"ALTER TABLE $t DROP COLUMN salary")
- sql(s"ALTER TABLE $t ADD COLUMN salary INT")
-
- // salary values are now null, so the salary < 999 filter returns nothing
- checkAnswer(spark.table("v"), Seq.empty)
- // IS NULL filter now matches all rows
- checkAnswer(spark.table("v_null"), Seq(Row(1, null), Row(10, null)))
- }
- }
-
// Column ID tests: Write operations
//
// [[writeTo().append()]] eagerly executes the command during the
@@ -2723,6 +2705,7 @@ class DataSourceV2DataFrameSuite
// create temp view using DataFrame API
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
// add top-level column to underlying table
@@ -2730,6 +2713,7 @@ class DataSourceV2DataFrameSuite
// accessing temp view should succeed as top-level column additions are
allowed
// view captures original columns
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
// insert data to verify view still works correctly
@@ -2745,6 +2729,7 @@ class DataSourceV2DataFrameSuite
// create temp view using DataFrame API
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "address"))
checkAnswer(spark.table("v"), Seq.empty)
// add nested column to underlying table
@@ -2769,6 +2754,7 @@ class DataSourceV2DataFrameSuite
// create temp view
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data",
"age"))
checkAnswer(spark.table("v"), Seq.empty)
// drop column from underlying table
@@ -2793,6 +2779,7 @@ class DataSourceV2DataFrameSuite
// create temp view using DataFrame API
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "address"))
checkAnswer(spark.table("v"), Seq.empty)
// drop nested column from underlying table
@@ -2817,6 +2804,7 @@ class DataSourceV2DataFrameSuite
// create temp view
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
// change nullability constraint using ALTER TABLE
@@ -2858,6 +2846,7 @@ class DataSourceV2DataFrameSuite
assert(originalTableId != newTableId)
// accessing temp view should work despite table ID change (returns
empty data)
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
// insert new data and verify view reflects it
@@ -2873,6 +2862,7 @@ class DataSourceV2DataFrameSuite
sql(s"CREATE TABLE $t (id bigint, data STRING, extra INT) USING foo")
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data",
"extra"))
checkAnswer(spark.table("v"), Seq.empty)
// alter table
@@ -2883,6 +2873,7 @@ class DataSourceV2DataFrameSuite
// recreate view with updated schema
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
// now it should work with new schema
@@ -2913,6 +2904,7 @@ class DataSourceV2DataFrameSuite
// accessing temp view should succeed as top-level column additions are
allowed
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
}
}
@@ -2925,6 +2917,7 @@ class DataSourceV2DataFrameSuite
// create temp view using SQL that should capture plan
sql(s"CREATE OR REPLACE TEMPORARY VIEW v AS SELECT * FROM $t")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
// verify that view stores analyzed plan
@@ -2935,6 +2928,7 @@ class DataSourceV2DataFrameSuite
sql(s"ALTER TABLE $t ADD COLUMN age int")
// accessing temp view should succeed as top-level column additions
are allowed
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
// insert data to verify view still works correctly
@@ -2951,6 +2945,7 @@ class DataSourceV2DataFrameSuite
// create temp view
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "name"))
checkAnswer(spark.table("v"), Seq.empty)
// change VARCHAR(10) to VARCHAR(20)
@@ -2975,6 +2970,7 @@ class DataSourceV2DataFrameSuite
// create temp view
spark.table(t).createOrReplaceTempView("v")
+ assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data"))
checkAnswer(spark.table("v"), Seq.empty)
// insert data into underlying table (no schema change)
@@ -2993,6 +2989,581 @@ class DataSourceV2DataFrameSuite
}
}
+ // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
+ // Each test creates a DSv2 table with initial data, builds a temp view with
a filter
+ // (to demonstrate that the stored plan is non-trivial), and then verifies
the view
+ // behavior after various table modifications (session or external).
+
+ /** Appends a row to a DSv2 table via the catalog API, bypassing the
session. */
+ // The row layout must match the current table column order.
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
+ extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Scenario 1.1 (session write)
+ test("temp view with stored plan reflects session write") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1.2 (external write)
+ test("temp view with stored plan reflects external write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via direct catalog API
+ externalAppend(
+ catalogName = "testcat",
+ ident = ident,
+ row = InternalRow(2, 200))
+
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1.2 connector w/ cache (external write, caching connector)
+ test("connector w/ cache: temp view stale after external write") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via catalog API (bypasses cache)
+ externalAppend(
+ catalogName = "cachingcat",
+ ident = ident,
+ row = InternalRow(2, 200))
+
+ // Caching connector returns stale table: external write invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, external write becomes
visible
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.1 (session ADD COLUMN)
+ test("temp view with stored plan preserves schema after session ADD COLUMN")
{
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
+ sql(s"INSERT INTO $t VALUES (2, 200, -1)")
+
+ // view preserves original 2-column schema, filter still applied
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.2 (external ADD COLUMN)
+ test("temp view with stored plan preserves schema after external ADD
COLUMN") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external schema change via catalog API
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("testcat").alterTable(ident, addCol)
+
+ // external writer adds data with new schema
+ externalAppend(
+ catalogName = "testcat",
+ ident = ident,
+ row = InternalRow(2, 200, -1))
+
+ // view preserves original 2-column schema, filter still applied
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
+ test("connector w/ cache: temp view stale after external ADD COLUMN") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external schema change + data via catalog API
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("cachingcat").alterTable(ident, addCol)
+
+ externalAppend(
+ catalogName = "cachingcat",
+ ident = ident,
+ row = InternalRow(2, 200, -1))
+
+ // Caching connector returns stale table: external changes invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, view preserves
original 2-column schema
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 3.1 (session column removal)
+ test("temp view with stored plan detects session column removal") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // session schema change via SQL
+ sql(s"ALTER TABLE $t DROP COLUMN salary")
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+
+ // Scenario 3.2 (external column removal)
+ test("temp view with stored plan detects external column removal") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external schema change via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ catalog("testcat").alterTable(ident, dropCol)
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+
+ // Scenario 3.2 connector w/ cache (external column removal, caching
connector)
+ test("connector w/ cache: temp view stale after external column removal") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external column removal via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ catalog("cachingcat").alterTable(ident, dropCol)
+
+ // Caching connector returns stale table: column removal invisible, no
error
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, column removal detected
+ sql(s"REFRESH TABLE $t")
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+
+ // Scenario 4.1 (session drop and recreate table)
+ test("temp view with stored plan resolves to session-recreated table") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ val originalTableId = catalog("testcat").loadTable(ident).id
+
+ // session drop and recreate via SQL
+ sql(s"DROP TABLE $t")
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+
+ val newTableId = catalog("testcat").loadTable(ident).id
+ assert(originalTableId != newTableId)
+
+ // view resolves to the new empty table
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // insert new data and verify the view picks it up
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ checkAnswer(spark.table("v"), Seq(Row(2, 200)))
+ }
+ }
+
+ // Scenario 4.2 (external drop and recreate table)
+ test("temp view with stored plan resolves to externally recreated table") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ val originalTableId = catalog("testcat").loadTable(ident).id
+
+ // external drop and recreate via catalog API
+ catalog("testcat").dropTable(ident)
+ catalog("testcat").createTable(
+ ident,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ val newTableId = catalog("testcat").loadTable(ident).id
+ assert(originalTableId != newTableId)
+
+ // view resolves to the new empty table
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // insert new data and verify the view picks it up
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ checkAnswer(spark.table("v"), Seq(Row(2, 200)))
+ }
+ }
+
+ // Scenario 4.2 connector w/ cache (external drop/recreate, caching
connector)
+ test("connector w/ cache: temp view stale after external drop/recreate") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external drop and recreate via catalog API
+ catalog("cachingcat").dropTable(ident)
+ catalog("cachingcat").createTable(
+ ident,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ // Caching connector returns stale table: drop/recreate invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, view resolves to new
empty table
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq.empty)
+ }
+ }
+
+ // Scenario 5.1 (session drop and re-add column with same type, multiple
views)
+ test("temp view with stored plan after session drop and re-add column same
type" +
+ " with unfiltered view") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ spark.table(t).createOrReplaceTempView("v_no_filter")
+ spark.table(t).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+ checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
+ checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
+
+ // drop and re-add column with same name and type
+ sql(s"ALTER TABLE $t DROP COLUMN salary")
+ sql(s"ALTER TABLE $t ADD COLUMN salary INT")
+
+ // salary values are now null, so the filtered view returns nothing
+ checkAnswer(spark.table("v"), Seq.empty)
+ // unfiltered view returns rows with null salary
+ checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
+ // IS NULL filter now matches all rows
+ checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10,
null)))
+ }
+ }
+
+ // Scenario 5.2 (external drop and re-add column with same type)
+ test("temp view with stored plan after external drop and re-add column same
type") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ spark.table(t).createOrReplaceTempView("v_no_filter")
+ spark.table(t).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+ checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
+ checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
+
+ // external drop and re-add column via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+ catalog("testcat").alterTable(ident, dropCol, addCol)
+
+ // salary values are now null, so the filtered view returns nothing
+ checkAnswer(spark.table("v"), Seq.empty)
+ // unfiltered view returns rows with null salary
+ checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
+ // IS NULL filter now matches all rows
+ checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10,
null)))
+ }
+ }
+
+ // Scenario 5.2 connector w/ cache (external drop/re-add column, caching
connector)
+ test("connector w/ cache: temp view stale after external drop/re-add column
same type") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external drop and re-add column with same type via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+ catalog("cachingcat").alterTable(ident, dropCol, addCol)
+
+ // Caching connector returns stale table: column drop/re-add invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, salary values are null
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq.empty)
+ }
+ }
+
+ // Scenario 6.1 (session drop and re-add column with different type)
+ test("temp view with stored plan detects session column type change") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // drop and re-add column with same name but different type
+ sql(s"ALTER TABLE $t DROP COLUMN salary")
+ sql(s"ALTER TABLE $t ADD COLUMN salary STRING")
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+
+ // Scenario 6.2 (external drop and re-add column with different type)
+ test("temp view with stored plan detects external column type change") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external drop and re-add column with different type via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+ catalog("testcat").alterTable(ident, dropCol, addCol)
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+
+ // Scenario 6.2 connector w/ cache (external column type change, caching
connector)
+ test("connector w/ cache: temp view stale after external column type
change") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external drop and re-add column with different type via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+ catalog("cachingcat").alterTable(ident, dropCol, addCol)
+
+ // Caching connector returns stale table: type change invisible, no error
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, type change detected
+ sql(s"REFRESH TABLE $t")
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+
+ // Scenario 7.1 (session type widening from INT to BIGINT)
+ test("temp view with stored plan detects session type widening") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // session type widening via SQL
+ sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE LONG")
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to BIGINT"))
+ }
+ }
+
+ // Scenario 7.2 (external type widening from INT to BIGINT)
+ test("temp view with stored plan detects external type widening") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // widen salary type from INT to BIGINT via catalog API
+ val updateType = TableChange.updateColumnType(Array("salary"), LongType)
+ catalog("testcat").alterTable(ident, updateType)
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to BIGINT"))
+ }
+ }
+
+ // Scenario 7.2 connector w/ cache (external type widening, caching
connector)
+ test("connector w/ cache: temp view stale after external type widening") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external type widening via catalog API
+ val updateType = TableChange.updateColumnType(Array("salary"), LongType)
+ catalog("cachingcat").alterTable(ident, updateType)
+
+ // Caching connector returns stale table: type change invisible, no error
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, type change detected
+ sql(s"REFRESH TABLE $t")
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to BIGINT"))
+ }
+ }
+
test("cached DSv2 table DataFrame is refreshed and reused after insert") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
@@ -3077,6 +3648,7 @@ class DataSourceV2DataFrameSuite
// verify external changes are reflected correctly when table is queried
assertNotCached(spark.table(t))
+ assert(spark.table(t).schema.fieldNames.toSeq == Seq("id", "value",
"category"))
checkAnswer(spark.table(t), Seq.empty)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]