This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cc9d9911d7ed [SPARK-46272][SQL] Support CTAS using DSv2 sources
cc9d9911d7ed is described below
commit cc9d9911d7eddfb3bdc7b1fa621687f42930e13a
Author: allisonwang-db <[email protected]>
AuthorDate: Wed Dec 20 16:17:45 2023 +0800
[SPARK-46272][SQL] Support CTAS using DSv2 sources
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/43949 supports CREATE TABLE using DSv2
sources. This PR supports CREATE TABLE AS SELECT (CTAS) using DSv2 sources. It
turns out that we don't need additional code changes. This PR simply adds more
test cases for CTAS queries.
### Why are the changes needed?
To add tests for CTAS for DSv2 sources.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44190 from allisonwang-db/spark-46272-ctas.
Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-classes.json | 6 +
docs/sql-error-conditions.md | 6 +
.../spark/sql/errors/QueryCompilationErrors.scala | 13 +-
.../datasources/v2/TableCapabilityCheck.scala | 2 +-
.../datasources/v2/V2SessionCatalog.scala | 19 ++-
.../DataSourceV2DataFrameSessionCatalogSuite.scala | 2 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +-
.../spark/sql/connector/DataSourceV2Suite.scala | 154 ++++++++++++++++++++-
.../spark/sql/connector/FakeV2Provider.scala | 22 +++
.../connector/SupportsCatalogOptionsSuite.scala | 2 +-
.../spark/sql/connector/V1WriteFallbackSuite.scala | 2 +-
.../streaming/test/DataStreamTableAPISuite.scala | 4 +-
12 files changed, 217 insertions(+), 17 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 30aacc07d318..930042505379 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -893,6 +893,12 @@
],
"sqlState" : "42K02"
},
+ "DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : {
+ "message" : [
+ "The schema of the data source table <tableSchema> does not match the
actual schema <actualSchema>. If you are using the DataFrameReader.schema API
or creating a table, avoid specifying the schema."
+ ],
+ "sqlState" : "42K03"
+ },
"DATETIME_OVERFLOW" : {
"message" : [
"Datetime operation overflow: <operation>."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index e4b04ce02fe2..94c7c167e392 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -492,6 +492,12 @@ Data source '`<provider>`' not found. Please make sure the
data source is regist
Failed to find the data source: `<provider>`. Please find packages at
`https://spark.apache.org/third-party-projects.html`.
+### DATA_SOURCE_TABLE_SCHEMA_MISMATCH
+
+[SQLSTATE:
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+The schema of the data source table `<tableSchema>` does not match the actual
schema `<actualSchema>`. If you are using the DataFrameReader.schema API or
creating a table, avoid specifying the schema.
+
### DATETIME_OVERFLOW
[SQLSTATE: 22008](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index cdab854c004b..3fd1fe04aed6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
unsupportedTableOperationError(table.name(), "either micro-batch or
continuous scan")
}
- def unsupportedAppendInBatchModeError(table: Table): Throwable = {
- unsupportedTableOperationError(table.name(), "append in batch mode")
+ def unsupportedAppendInBatchModeError(name: String): Throwable = {
+ unsupportedTableOperationError(name, "append in batch mode")
}
def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
@@ -3924,4 +3924,13 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
errorClass = "NESTED_EXECUTE_IMMEDIATE",
messageParameters = Map("sqlString" -> toSQLStmt(queryString)))
}
+
+ def dataSourceTableSchemaMismatchError(
+ tableSchema: StructType, actualSchema: StructType): Throwable = {
+ new AnalysisException(
+ errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
+ messageParameters = Map(
+ "tableSchema" -> toSQLType(tableSchema),
+ "actualSchema" -> toSQLType(actualSchema)))
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
index b1a93addc80b..e332c6b8014a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
@@ -47,7 +47,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
// TODO: check STREAMING_WRITE capability. It's not doable now because
we don't have a
// a logical plan for streaming write.
case AppendData(r: DataSourceV2Relation, _, _, _, _, _) if
!supportsBatchWrite(r.table) =>
- throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.table)
+ throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.name)
case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _)
if !r.table.supports(BATCH_WRITE) ||
!r.table.supports(OVERWRITE_DYNAMIC) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 5b1ff7c67b26..a7694f5d829d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException,
NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase,
CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils,
ClusterBySpec, SessionCatalog}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util,
Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces,
Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
@@ -36,7 +37,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.V1Function
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
@@ -232,7 +233,21 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
- loadTable(ident)
+ val table = loadTable(ident)
+
+ // Check if the schema of the created table matches the given schema.
+ // TODO: move this check in loadTable to match the behavior with
+ // existing file data sources.
+ if (schema.nonEmpty) {
+ val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
+ table.columns().asSchema)
+ if (!DataType.equalsIgnoreNullability(tableSchema, schema)) {
+ throw QueryCompilationErrors.dataSourceTableSchemaMismatchError(
+ table.columns().asSchema, schema)
+ }
+ }
+
+ table
}
private def toOptions(properties: Map[String, String]): Map[String, String]
= {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index 835566238c9c..5d5ea6499c49 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -153,7 +153,7 @@ private [connector] trait SessionCatalogTest[T <: Table,
Catalog <: TestV2Sessio
spark.sessionState.catalogManager.catalog(name)
}
- protected val v2Format: String = classOf[FakeV2Provider].getName
+ protected val v2Format: String =
classOf[FakeV2ProviderWithCustomSchema].getName
protected val catalogClassName: String =
classOf[InMemoryTableSessionCatalog].getName
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 47e79e45b737..589283a29b85 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -54,7 +54,7 @@ abstract class DataSourceV2SQLSuite
with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase
with AdaptiveSparkPlanHelper {
- protected val v2Source = classOf[FakeV2Provider].getName
+ protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
override protected val v2Format = v2Source
protected def doInsert(tableName: String, insert: DataFrame, mode:
SaveMode): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 6e365e1d6059..3f3dc82da5ad 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -698,7 +698,7 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
}
test("SPARK-46043: create table in SQL with path option") {
- val cls = classOf[SupportsExternalMetadataDataSource]
+ val cls = classOf[WritableDataSourceSupportsExternalMetadata]
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/test"
Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path)
@@ -725,8 +725,141 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
}
}
+ test("SPARK-46272: create table - schema mismatch") {
+ withTable("test") {
+ val cls = classOf[WritableDataSourceSupportsExternalMetadata]
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName}")
+ },
+ errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
+ parameters = Map(
+ "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+ "actualSchema" -> "\"STRUCT<x: INT, y: INT>\""))
+ }
+ }
+
+ test("SPARK-46272: create table as select") {
+ val cls = classOf[WritableDataSourceSupportsExternalMetadata]
+ withTable("test") {
+ sql(
+ s"""
+ |CREATE TABLE test USING ${cls.getName}
+ |AS VALUES (0, 1), (1, 2) t(i, j)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq((0, 1), (1, 2)).toDF("i",
"j"))
+ sql(
+ s"""
+ |CREATE OR REPLACE TABLE test USING ${cls.getName}
+ |AS VALUES (2, 3), (4, 5) t(i, j)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i",
"j"))
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS test USING ${cls.getName}
+ |AS VALUES (3, 4), (4, 5)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i",
"j"))
+ }
+ }
+
+ test("SPARK-46272: create table as select - schema name mismatch") {
+ val cls = classOf[WritableDataSourceSupportsExternalMetadata]
+ withTable("test") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1), (1,
2)")
+ },
+ errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
+ parameters = Map(
+ "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+ "actualSchema" -> "\"STRUCT<col1: INT, col2: INT>\""))
+ }
+ }
+
+ test("SPARK-46272: create table as select - column type mismatch") {
+ val cls = classOf[WritableDataSourceSupportsExternalMetadata]
+ withTable("test") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(
+ s"""
+ |CREATE TABLE test USING ${cls.getName}
+ |AS VALUES ('a', 'b'), ('c', 'd') t(i, j)
+ |""".stripMargin)
+ },
+ errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
+ parameters = Map(
+ "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+ "actualSchema" -> "\"STRUCT<i: STRING, j: STRING>\""))
+ }
+ }
+
+ test("SPARK-46272: create or replace table as select with path options") {
+ val cls = classOf[CustomSchemaAndPartitioningDataSource]
+ withTempDir { dir =>
+ val path = s"${dir.getCanonicalPath}/test"
+ Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path)
+ withTable("test") {
+ sql(
+ s"""
+ |CREATE TABLE test USING ${cls.getName}
+ |OPTIONS (PATH '$path')
+ |AS VALUES (0, 1)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1),
Row(1, 2)))
+ // Check the data currently in the path location.
+ checkAnswer(
+ spark.read.format("csv").load(path),
+ Seq(Row("0", "1"), Row("0", "1"), Row("1", "2")))
+ // Replace the table with new data.
+ sql(
+ s"""
+ |CREATE OR REPLACE TABLE test USING ${cls.getName}
+ |OPTIONS (PATH '$path')
+ |AS VALUES (2, 3)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1),
Row(1, 2), Row(2, 3)))
+ // Replace the table without the path options.
+ sql(
+ s"""
+ |CREATE OR REPLACE TABLE test USING ${cls.getName}
+ |AS VALUES (3, 4)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(3, 4)))
+ }
+ }
+ }
+
+ test("SPARK-46272: create table as select with incompatible data sources") {
+ // CTAS with data sources that do not support external metadata.
+ withTable("test") {
+ val cls = classOf[SimpleDataSourceV2]
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1)")
+ },
+ errorClass =
"CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+ parameters = Map(
+ "tableName" -> "`default`.`test`",
+ "provider" -> "org.apache.spark.sql.connector.SimpleDataSourceV2"))
+ }
+ // CTAS with data sources that do not support batch write.
+ withTable("test") {
+ val cls = classOf[SchemaRequiredDataSource]
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"CREATE TABLE test USING ${cls.getName} AS SELECT * FROM VALUES
(0, 1)")
+ },
+ errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+ parameters = Map(
+ "tableName" -> "`spark_catalog`.`default`.`test`",
+ "operation" -> "append in batch mode"))
+ }
+ }
+
test("SPARK-46273: insert into") {
- val cls = classOf[SupportsExternalMetadataDataSource]
+ val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTable("test") {
sql(
s"""
@@ -766,7 +899,7 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
}
test("SPARK-46273: insert overwrite") {
- val cls = classOf[SupportsExternalMetadataDataSource]
+ val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTable("test") {
sql(
s"""
@@ -788,7 +921,7 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
}
test("SPARK-46273: insert into with partition") {
- val cls = classOf[SupportsExternalMetadataDataSource]
+ val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTable("test") {
sql(s"CREATE TABLE test(x INT, y INT) USING ${cls.getName} PARTITIONED
BY (x, y)")
sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
@@ -818,7 +951,7 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
}
test("SPARK-46273: insert overwrite with partition") {
- val cls = classOf[SupportsExternalMetadataDataSource]
+ val cls = classOf[CustomSchemaAndPartitioningDataSource]
withTable("test") {
sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName} PARTITIONED
BY (x, y)")
sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
@@ -1334,9 +1467,18 @@ class SimpleWriteOnlyDataSource extends
SimpleWritableDataSource {
}
}
-class SupportsExternalMetadataDataSource extends SimpleWritableDataSource {
+/**
+ * A writable data source that supports external metadata with a fixed schema
(i int, j int).
+ */
+class WritableDataSourceSupportsExternalMetadata extends
SimpleWritableDataSource {
override def supportsExternalMetadata(): Boolean = true
+}
+/**
+ * A writable data source that supports external metadata with
+ * user-specified schema and partitioning.
+ */
+class CustomSchemaAndPartitioningDataSource extends
WritableDataSourceSupportsExternalMetadata {
class TestTable(
schema: StructType,
partitioning: Array[Transform],
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
index 174700e8d24f..25d2d5a67d44 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
@@ -61,3 +61,25 @@ class FakeV2Provider extends TableProvider {
object FakeV2Provider {
val schema: StructType = new StructType().add("i", "int").add("j", "int")
}
+
+class FakeV2ProviderWithCustomSchema extends FakeV2Provider {
+ class FakeTable(
+ schema: StructType,
+ partitioning: Array[Transform],
+ options: CaseInsensitiveStringMap) extends SimpleBatchTable {
+ override def schema(): StructType = schema
+
+ override def partitioning(): Array[Transform] = partitioning
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
ScanBuilder = {
+ new MyScanBuilder()
+ }
+ }
+
+ override def getTable(
+ schema: StructType,
+ partitioning: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ new FakeTable(schema, partitioning, new
CaseInsensitiveStringMap(properties))
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index fd4f719417e4..b952270fc786 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -380,7 +380,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with
SharedSparkSession with
}
class CatalogSupportingInMemoryTableProvider
- extends FakeV2Provider
+ extends FakeV2ProviderWithCustomSchema
with SupportsCatalogOptions {
override def extractIdentifier(options: CaseInsensitiveStringMap):
Identifier = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
index 181dc0ea2074..ad31cf84eeb3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
@@ -249,7 +249,7 @@ private object InMemoryV1Provider {
}
class InMemoryV1Provider
- extends FakeV2Provider
+ extends FakeV2ProviderWithCustomSchema
with DataSourceRegister
with CreatableRelationProvider {
override def getTable(options: CaseInsensitiveStringMap): Table = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 7bf81fb98655..5a4f386f1d1d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
-import org.apache.spark.sql.connector.{FakeV2Provider,
InMemoryTableSessionCatalog}
+import org.apache.spark.sql.connector.{FakeV2Provider,
FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead,
Table, TableCapability, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
@@ -204,7 +204,7 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
}
test("write: write to table with default session catalog") {
- val v2Source = classOf[FakeV2Provider].getName
+ val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
spark.conf.set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
classOf[InMemoryTableSessionCatalog].getName)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]