This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 47ca28f3b77e [SPARK-56619][SQL][TEST] Add DSv2 repeated table access
tests with internal/external changes in Classic/Connect mode
47ca28f3b77e is described below
commit 47ca28f3b77efcad8a32052d7c25b33718cd1fde
Author: Thang Long Vu <[email protected]>
AuthorDate: Wed May 27 09:28:05 2026 +0800
[SPARK-56619][SQL][TEST] Add DSv2 repeated table access tests with
internal/external changes in Classic/Connect mode
### What changes were proposed in this pull request?
Add 9 tests verifying that repeated `sql()` calls on DSv2 tables correctly
reflect external changes made via the catalog API, covering both classic and
Connect modes (so 18 tests in total)
`DSv2RepeatedTableAccessTests` extends `DSv2ExternalMutationTestBase`,
following the same pattern as `DSv2TempViewWithStoredPlanTests` (PR #55571).
The tests cover three external mutation scenarios, each with a session mutation
baseline, an external mutation test, and a caching-connector variant:
- **Scenario 1 (external writes)**: External data appended via the catalog
API is visible to subsequent `sql()` queries.
- **Scenario 2 (external schema changes)**: External ADD COLUMN via the
catalog API is visible to subsequent `sql()` queries.
- **Scenario 3 (external drop/recreate)**: External drop and recreate via
the catalog API resolves to the new empty table.
For each scenario, the caching-connector variant (`cachingcat`)
demonstrates that a connector with its own `loadTable` cache returns stale
results until `REFRESH TABLE` invalidates the cache.
#### New files
- **`DSv2RepeatedTableAccessTests`**: Shared trait containing all 9 tests
(3 scenarios x 3 variants: session mutation baseline, external mutation,
caching connector), using `session.sql(...)` with `.collect()` calls (harmless
in classic mode, required for Connect).
#### Modified files
- **`DataSourceV2DataFrameSuite`**: Mixes in `DSv2RepeatedTableAccessTests`
(classic runner, `testPrefix = ""`).
- **`DataSourceV2TempViewConnectSuite` ->
`DataSourceV2DataFrameConnectSuite`**: Renamed and now mixes in both
`DSv2TempViewWithStoredPlanTests` and `DSv2RepeatedTableAccessTests`. This is
the Connect counterpart of the classic `DataSourceV2DataFrameSuite`.
- **`DSv2ExternalMutationTestBase`**: Scaladoc updated to reference both
consumer traits.
### Why are the changes needed?
These tests document and lock down the expected behavior: repeated `sql()`
access without CACHE TABLE always sees the latest table state after external
mutations. This prevents regressions if internal resolution or caching logic
changes. The trait extraction enables Connect-mode reuse without duplicating
test logic.
### Does this PR introduce _any_ user-facing change?
No. This PR is test-only.
### How was this patch tested?
9 new tests run in both classic and Connect modes:
Classic:
```
build/sbt 'sql/testOnly *DataSourceV2DataFrameSuite -- -z "repeated sql()"'
```
Connect:
```
build/sbt 'connect/server/testOnly *DataSourceV2DataFrameConnectSuite -- -z
"repeated sql()"'
```
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-6)
Closes #55462 from longvu-db/dsv2-pr2-repeated-sql.
Authored-by: Thang Long Vu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e40d17535e6b52c71e5fde9de3e23dc9680b7c41)
Signed-off-by: Wenchen Fan <[email protected]>
---
...ala => DataSourceV2DataFrameConnectSuite.scala} | 14 +-
.../connector/DSv2ExternalMutationTestBase.scala | 13 +-
.../connector/DSv2RepeatedTableAccessTests.scala | 222 ++++++++++++++++++++
.../DSv2TempViewWithStoredPlanTests.scala | 228 ++++++++++-----------
.../sql/connector/DataSourceV2DataFrameSuite.scala | 3 +-
5 files changed, 358 insertions(+), 122 deletions(-)
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
similarity index 85%
rename from
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
rename to
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
index ce947379b233..a13e953460a7 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2DataFrameConnectSuite.scala
@@ -21,17 +21,21 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
-import org.apache.spark.sql.connector.DSv2TempViewWithStoredPlanTests
+import org.apache.spark.sql.connector.{DSv2RepeatedTableAccessTests,
DSv2TempViewWithStoredPlanTests}
import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
InMemoryTableCatalog, TableCatalog}
/**
- * Connect-mode runner for [[DSv2TempViewWithStoredPlanTests]]. All test logic
lives in the shared
- * trait; this class only provides the Connect-specific session, catalog
access, and result
+ * Connect-mode counterpart of
[[org.apache.spark.sql.connector.DataSourceV2DataFrameSuite]].
+ *
+ * Runs DSv2 temp view tests ([[DSv2TempViewWithStoredPlanTests]]) and
repeated table access tests
+ * ([[DSv2RepeatedTableAccessTests]]) under Spark Connect. All test logic
lives in the shared
+ * traits; this class only provides the Connect-specific session, catalog
access, and result
* comparison.
*/
-class DataSourceV2TempViewConnectSuite
+class DataSourceV2DataFrameConnectSuite
extends SparkConnectServerTest
- with DSv2TempViewWithStoredPlanTests {
+ with DSv2TempViewWithStoredPlanTests
+ with DSv2RepeatedTableAccessTests {
override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
index 2e60c24c4460..4d16339e09de 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2ExternalMutationTestBase.scala
@@ -33,11 +33,20 @@ import
org.apache.spark.sql.connector.catalog.{BufferedRows, CatalogV2Util, Iden
* (where the test session IS the server session) and Connect mode (where the
test session
* is a Connect client and catalog access requires the server session).
*
- * Concrete suites override the abstract methods and mix in the test trait
- * [[DSv2TempViewWithStoredPlanTests]].
+ * Concrete suites override the abstract methods and mix in a test trait such
as
+ * [[DSv2TempViewWithStoredPlanTests]] or [[DSv2RepeatedTableAccessTests]].
*/
trait DSv2ExternalMutationTestBase extends QueryTest {
+ /** Fully qualified table name under the non-caching test catalog. */
+ protected val testTable: String = "testcat.ns1.ns2.tbl"
+
+ /** Fully qualified table name under the caching test catalog. */
+ protected val cachingTestTable: String = "cachingcat.ns1.ns2.tbl"
+
+ /** Identifier for the test table within its namespace. */
+ protected val testIdent: Identifier = Identifier.of(Array("ns1", "ns2"),
"tbl")
+
/** Prefix for test names, e.g. "" or "[connect] ". */
protected def testPrefix: String
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedTableAccessTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedTableAccessTests.scala
new file mode 100644
index 000000000000..533d10a94979
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2RepeatedTableAccessTests.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, InMemoryTableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * Shared repeated table access tests with external changes for DSv2 tables.
These tests verify
+ * that repeated `sql()` calls correctly reflect both session and external
mutations:
+ *
+ * - Scenario 1 (external writes): external data appended via the catalog API
is visible.
+ * - Scenario 2 (external schema changes): external ADD COLUMN via the
catalog API is visible.
+ * - Scenario 3 (external drop/recreate): external drop and recreate via the
catalog API
+ * resolves to the new empty table.
+ *
+ * Each scenario includes a session mutation baseline, an external mutation
test, and a
+ * caching-connector variant showing stale results until `REFRESH TABLE`.
+ *
+ * NOTE: All `session.sql(...)` calls append `.collect()` because Connect
client DataFrames
+ * are lazy and require an action to trigger execution. In classic mode
`.collect()` on
+ * DDL / DML is a no-op (these execute eagerly), so this is harmless.
+ */
+trait DSv2RepeatedTableAccessTests extends DSv2ExternalMutationTestBase {
+
+ // Uses testTable, cachingTestTable, and testIdent from
DSv2ExternalMutationTestBase.
+
+ // Scenario 1: data changes via writes
+
+ test(s"${testPrefix}repeated sql() reflects session write") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100),
Row(2, 200)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}repeated sql() reflects external write") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100),
Row(2, 200)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}connector w/ cache: repeated sql() stale after external
write") {
+ withTestSession { session =>
+ withTestTableAndViews(session, cachingTestTable) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1,
100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200))
+
+ // Caching connector returns stale table: external write invisible
+ checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1,
100)))
+
+ // REFRESH TABLE invalidates the connector cache, external write
becomes visible
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+ checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1,
100), Row(2, 200)))
+ }
+ }
+ }
+
+ // Scenario 2: schema changes
+
+ test(s"${testPrefix}repeated sql() reflects session schema change") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+ session.sql(s"ALTER TABLE $testTable ADD COLUMN new_col INT").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect()
+ checkRows(
+ session.sql(s"SELECT * FROM $testTable"),
+ Seq(Row(1, 100, null), Row(2, 200, -1)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}repeated sql() reflects external schema change") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
+ catalog.alterTable(testIdent, addCol)
+
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200, -1))
+
+ checkRows(
+ session.sql(s"SELECT * FROM $testTable"),
+ Seq(Row(1, 100, null), Row(2, 200, -1)))
+ }
+ }
+ }
+
+ test(s"${testPrefix}connector w/ cache: repeated sql() stale after external
schema change") {
+ withTestSession { session =>
+ withTestTableAndViews(session, cachingTestTable) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1,
100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
+ catalog.alterTable(testIdent, addCol)
+
+ externalAppend(catalog = catalog, ident = testIdent, row =
InternalRow(2, 200, -1))
+
+ // Caching connector returns stale table: external changes invisible
+ checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1,
100)))
+
+ // REFRESH TABLE invalidates the connector cache, schema change + data
visible
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+ checkRows(
+ session.sql(s"SELECT * FROM $cachingTestTable"),
+ Seq(Row(1, 100, null), Row(2, 200, -1)))
+ }
+ }
+ }
+
+ // Scenario 3: drop and recreate table
+
+ test(s"${testPrefix}repeated sql() reflects session drop/recreate") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+ session.sql(s"DROP TABLE $testTable").collect()
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq.empty)
+ }
+ }
+ }
+
+ test(s"${testPrefix}repeated sql() reflects external drop/recreate") {
+ withTestSession { session =>
+ withTestTableAndViews(session, testTable) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq(Row(1, 100)))
+
+ val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ checkRows(session.sql(s"SELECT * FROM $testTable"), Seq.empty)
+ }
+ }
+ }
+
+ test(s"${testPrefix}connector w/ cache: repeated sql() stale after external
drop/recreate") {
+ withTestSession { session =>
+ withTestTableAndViews(session, cachingTestTable) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100)").collect()
+ checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1,
100)))
+
+ val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
+ catalog.dropTable(testIdent)
+ catalog.createTable(
+ testIdent,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ // Caching connector returns stale table: drop/recreate invisible
+ checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq(Row(1,
100)))
+
+ // REFRESH TABLE invalidates the connector cache, new empty table
visible
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
+ checkRows(session.sql(s"SELECT * FROM $cachingTestTable"), Seq.empty)
+ }
+ }
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
index eb40e3ac056f..1a5229258e7d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DSv2TempViewWithStoredPlanTests.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo}
+import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog,
Column, InMemoryTableCatalog, TableChange, TableInfo}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
/**
@@ -33,21 +33,19 @@ import org.apache.spark.sql.types.{IntegerType, LongType,
StringType}
*/
trait DSv2TempViewWithStoredPlanTests extends DSv2ExternalMutationTestBase {
- private val T = "testcat.ns1.ns2.tbl"
- private val CT = "cachingcat.ns1.ns2.tbl"
- private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl")
+ // Uses testTable, cachingTestTable, and testIdent from
DSv2ExternalMutationTestBase.
// Scenario 1.1 (session write)
test(s"${testPrefix}temp view with stored plan reflects session write") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
- session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
}
}
@@ -56,11 +54,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 1.2 (external write)
test(s"${testPrefix}temp view with stored plan reflects external write") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -74,11 +72,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 1.2 connector w/ cache (external write, caching connector)
test(s"${testPrefix}connector w/ cache: temp view stale after external
write") {
withTestSession { session =>
- withTestTableAndViews(session, CT, Seq("v")) {
- session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10,
1000)").collect()
- session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(cachingTestTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
@@ -88,7 +86,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
checkRows(session.table("v"), Seq(Row(1, 100)))
// REFRESH TABLE invalidates the connector cache, external write
becomes visible
- session.sql(s"REFRESH TABLE $CT").collect()
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
}
}
@@ -97,15 +95,15 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 2.1 (session ADD COLUMN)
test(s"${testPrefix}temp view with stored plan preserves schema after
session ADD COLUMN") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
- session.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect()
- session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect()
+ session.sql(s"ALTER TABLE $testTable ADD COLUMN new_column
INT").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200, -1)").collect()
// view preserves original 2-column schema, filter still applied
checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
@@ -116,11 +114,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 2.2 (external ADD COLUMN)
test(s"${testPrefix}temp view with stored plan preserves schema after
external ADD COLUMN") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
// external schema change via catalog API
@@ -139,11 +137,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
test(s"${testPrefix}connector w/ cache: temp view stale after external ADD
COLUMN") {
withTestSession { session =>
- withTestTableAndViews(session, CT, Seq("v")) {
- session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10,
1000)").collect()
- session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(cachingTestTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
@@ -156,7 +154,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
checkRows(session.table("v"), Seq(Row(1, 100)))
// REFRESH TABLE invalidates the connector cache, view preserves
original 2-column schema
- session.sql(s"REFRESH TABLE $CT").collect()
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
checkRows(session.table("v"), Seq(Row(1, 100), Row(2, 200)))
}
}
@@ -165,14 +163,14 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 3.1 (session column removal)
test(s"${testPrefix}temp view with stored plan detects session column
removal") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
- session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
+ session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
checkError(
exception = intercept[AnalysisException] {
session.table("v").collect() },
@@ -189,11 +187,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 3.2 (external column removal)
test(s"${testPrefix}temp view with stored plan detects external column
removal") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -215,11 +213,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 3.2 connector w/ cache (external column removal, caching
connector)
test(s"${testPrefix}connector w/ cache: temp view stale after external
column removal") {
withTestSession { session =>
- withTestTableAndViews(session, CT, Seq("v")) {
- session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10,
1000)").collect()
- session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(cachingTestTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
@@ -230,7 +228,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
checkRows(session.table("v"), Seq(Row(1, 100)))
// REFRESH TABLE invalidates the connector cache, column removal
detected
- session.sql(s"REFRESH TABLE $CT").collect()
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
checkError(
exception = intercept[AnalysisException] {
session.table("v").collect() },
condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
@@ -246,18 +244,18 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 4.1 (session drop and recreate table)
test(s"${testPrefix}temp view with stored plan resolves to session-recreated
table") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
val originalTableId = catalog.loadTable(testIdent).id
- session.sql(s"DROP TABLE $T").collect()
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
+ session.sql(s"DROP TABLE $testTable").collect()
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
val newTableId = catalog.loadTable(testIdent).id
assert(originalTableId != newTableId)
@@ -265,7 +263,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// view resolves to the new empty table
checkRows(session.table("v"), Seq.empty)
- session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
checkRows(session.table("v"), Seq(Row(2, 200)))
}
}
@@ -274,11 +272,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 4.2 (external drop and recreate table)
test(s"${testPrefix}temp view with stored plan resolves to externally
recreated table") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -299,7 +297,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// view resolves to the new empty table
checkRows(session.table("v"), Seq.empty)
- session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (2, 200)").collect()
checkRows(session.table("v"), Seq(Row(2, 200)))
}
}
@@ -308,11 +306,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 4.2 connector w/ cache (external drop/recreate, caching
connector)
test(s"${testPrefix}connector w/ cache: temp view stale after external
drop/recreate") {
withTestSession { session =>
- withTestTableAndViews(session, CT, Seq("v")) {
- session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10,
1000)").collect()
- session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(cachingTestTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
@@ -329,7 +327,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
checkRows(session.table("v"), Seq(Row(1, 100)))
// REFRESH TABLE invalidates the connector cache, view resolves to new
empty table
- session.sql(s"REFRESH TABLE $CT").collect()
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
checkRows(session.table("v"), Seq.empty)
}
}
@@ -339,20 +337,21 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
test(s"${testPrefix}temp view with stored plan after session drop and re-add
column same type" +
" with unfiltered view") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v", "v_no_filter",
"v_filter_is_null")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
-
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
- session.table(T).createOrReplaceTempView("v_no_filter")
- session.table(T).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
+ withTestTableAndViews(session, testTable, Seq("v", "v_no_filter",
"v_filter_is_null")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
+
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
+ session.table(testTable).createOrReplaceTempView("v_no_filter")
+ session.table(testTable).filter("salary IS NULL")
+ .createOrReplaceTempView("v_filter_is_null")
checkRows(session.table("v"), Seq(Row(1, 100)))
checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10,
1000)))
checkRows(session.table("v_filter_is_null"), Seq.empty)
// drop and re-add column with same name and type
- session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
- session.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect()
+ session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
+ session.sql(s"ALTER TABLE $testTable ADD COLUMN salary INT").collect()
// salary values are now null, so the filtered view returns nothing
checkRows(session.table("v"), Seq.empty)
@@ -368,13 +367,14 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
test(s"${testPrefix}temp view with stored plan after external drop and
re-add column " +
"same type") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v", "v_no_filter",
"v_filter_is_null")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
-
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
- session.table(T).createOrReplaceTempView("v_no_filter")
- session.table(T).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
+ withTestTableAndViews(session, testTable, Seq("v", "v_no_filter",
"v_filter_is_null")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
+
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
+ session.table(testTable).createOrReplaceTempView("v_no_filter")
+ session.table(testTable).filter("salary IS NULL")
+ .createOrReplaceTempView("v_filter_is_null")
checkRows(session.table("v"), Seq(Row(1, 100)))
checkRows(session.table("v_no_filter"), Seq(Row(1, 100), Row(10,
1000)))
checkRows(session.table("v_filter_is_null"), Seq.empty)
@@ -399,11 +399,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
test(s"${testPrefix}connector w/ cache: temp view stale after external
drop/re-add column " +
"same type") {
withTestSession { session =>
- withTestTableAndViews(session, CT, Seq("v")) {
- session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10,
1000)").collect()
- session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(cachingTestTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
@@ -415,7 +415,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
checkRows(session.table("v"), Seq(Row(1, 100)))
// REFRESH TABLE invalidates the connector cache, salary values are
null
- session.sql(s"REFRESH TABLE $CT").collect()
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
checkRows(session.table("v"), Seq.empty)
}
}
@@ -424,15 +424,15 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 6.1 (session drop and re-add column with different type)
test(s"${testPrefix}temp view with stored plan detects session column type
change") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
- session.sql(s"ALTER TABLE $T DROP COLUMN salary").collect()
- session.sql(s"ALTER TABLE $T ADD COLUMN salary STRING").collect()
+ session.sql(s"ALTER TABLE $testTable DROP COLUMN salary").collect()
+ session.sql(s"ALTER TABLE $testTable ADD COLUMN salary
STRING").collect()
checkError(
exception = intercept[AnalysisException] {
session.table("v").collect() },
@@ -449,11 +449,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 6.2 (external drop and re-add column with different type)
test(s"${testPrefix}temp view with stored plan detects external column type
change") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -476,11 +476,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 6.2 connector w/ cache (external column type change, caching
connector)
test(s"${testPrefix}connector w/ cache: temp view stale after external
column type change") {
withTestSession { session =>
- withTestTableAndViews(session, CT, Seq("v")) {
- session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10,
1000)").collect()
- session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(cachingTestTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
@@ -492,7 +492,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
checkRows(session.table("v"), Seq(Row(1, 100)))
// REFRESH TABLE invalidates the connector cache, type change detected
- session.sql(s"REFRESH TABLE $CT").collect()
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
checkError(
exception = intercept[AnalysisException] {
session.table("v").collect() },
condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
@@ -508,14 +508,14 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 7.1 (session type widening from INT to BIGINT)
test(s"${testPrefix}temp view with stored plan detects session type
widening") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
- session.sql(s"ALTER TABLE $T ALTER COLUMN salary TYPE LONG").collect()
+ session.sql(s"ALTER TABLE $testTable ALTER COLUMN salary TYPE
LONG").collect()
checkError(
exception = intercept[AnalysisException] {
session.table("v").collect() },
@@ -532,11 +532,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 7.2 (external type widening from INT to BIGINT)
test(s"${testPrefix}temp view with stored plan detects external type
widening") {
withTestSession { session =>
- withTestTableAndViews(session, T, Seq("v")) {
- session.sql(s"CREATE TABLE $T (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, testTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $testTable (id INT, salary INT) USING
foo").collect()
+ session.sql(s"INSERT INTO $testTable VALUES (1, 100), (10,
1000)").collect()
- session.table(T).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(testTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[InMemoryTableCatalog](session, "testcat")
@@ -558,11 +558,11 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
// Scenario 7.2 connector w/ cache (external type widening, caching
connector)
test(s"${testPrefix}connector w/ cache: temp view stale after external type
widening") {
withTestSession { session =>
- withTestTableAndViews(session, CT, Seq("v")) {
- session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING
foo").collect()
- session.sql(s"INSERT INTO $CT VALUES (1, 100), (10, 1000)").collect()
+ withTestTableAndViews(session, cachingTestTable, Seq("v")) {
+ session.sql(s"CREATE TABLE $cachingTestTable (id INT, salary INT)
USING foo").collect()
+ session.sql(s"INSERT INTO $cachingTestTable VALUES (1, 100), (10,
1000)").collect()
- session.table(CT).filter("salary < 999").createOrReplaceTempView("v")
+ session.table(cachingTestTable).filter("salary <
999").createOrReplaceTempView("v")
checkRows(session.table("v"), Seq(Row(1, 100)))
val catalog = getTableCatalog[CachingInMemoryTableCatalog](session,
"cachingcat")
@@ -573,7 +573,7 @@ trait DSv2TempViewWithStoredPlanTests extends
DSv2ExternalMutationTestBase {
checkRows(session.table("v"), Seq(Row(1, 100)))
// REFRESH TABLE invalidates the connector cache, type change detected
- session.sql(s"REFRESH TABLE $CT").collect()
+ session.sql(s"REFRESH TABLE $cachingTestTable").collect()
checkError(
exception = intercept[AnalysisException] {
session.table("v").collect() },
condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 13f8a3455480..139a6c75d793 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -47,7 +47,8 @@ import org.apache.spark.unsafe.types.UTF8String
class DataSourceV2DataFrameSuite
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests
= false)
- with DSv2TempViewWithStoredPlanTests {
+ with DSv2TempViewWithStoredPlanTests
+ with DSv2RepeatedTableAccessTests {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import testImplicits._
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]