This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cc9d9911d7ed [SPARK-46272][SQL] Support CTAS using DSv2 sources
cc9d9911d7ed is described below

commit cc9d9911d7eddfb3bdc7b1fa621687f42930e13a
Author: allisonwang-db <[email protected]>
AuthorDate: Wed Dec 20 16:17:45 2023 +0800

    [SPARK-46272][SQL] Support CTAS using DSv2 sources
    
    ### What changes were proposed in this pull request?
    
    https://github.com/apache/spark/pull/43949 supports CREATE TABLE using DSv2 
sources. This PR supports CREATE TABLE AS SELECT (CTAS) using DSv2 sources. It 
turns out that we don't need additional code changes. This PR simply adds more 
test cases for CTAS queries.
    
    ### Why are the changes needed?
    
    To add tests for CTAS for DSv2 sources.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44190 from allisonwang-db/spark-46272-ctas.
    
    Authored-by: allisonwang-db <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-classes.json    |   6 +
 docs/sql-error-conditions.md                       |   6 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |  13 +-
 .../datasources/v2/TableCapabilityCheck.scala      |   2 +-
 .../datasources/v2/V2SessionCatalog.scala          |  19 ++-
 .../DataSourceV2DataFrameSessionCatalogSuite.scala |   2 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |   2 +-
 .../spark/sql/connector/DataSourceV2Suite.scala    | 154 ++++++++++++++++++++-
 .../spark/sql/connector/FakeV2Provider.scala       |  22 +++
 .../connector/SupportsCatalogOptionsSuite.scala    |   2 +-
 .../spark/sql/connector/V1WriteFallbackSuite.scala |   2 +-
 .../streaming/test/DataStreamTableAPISuite.scala   |   4 +-
 12 files changed, 217 insertions(+), 17 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 30aacc07d318..930042505379 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -893,6 +893,12 @@
     ],
     "sqlState" : "42K02"
   },
+  "DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : {
+    "message" : [
+      "The schema of the data source table <tableSchema> does not match the 
actual schema <actualSchema>. If you are using the DataFrameReader.schema API 
or creating a table, avoid specifying the schema."
+    ],
+    "sqlState" : "42K03"
+  },
   "DATETIME_OVERFLOW" : {
     "message" : [
       "Datetime operation overflow: <operation>."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index e4b04ce02fe2..94c7c167e392 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -492,6 +492,12 @@ Data source '`<provider>`' not found. Please make sure the 
data source is regist
 
 Failed to find the data source: `<provider>`. Please find packages at 
`https://spark.apache.org/third-party-projects.html`.
 
+### DATA_SOURCE_TABLE_SCHEMA_MISMATCH
+
+[SQLSTATE: 
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+The schema of the data source table `<tableSchema>` does not match the actual 
schema `<actualSchema>`. If you are using the DataFrameReader.schema API or 
creating a table, avoid specifying the schema.
+
 ### DATETIME_OVERFLOW
 
 [SQLSTATE: 22008](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index cdab854c004b..3fd1fe04aed6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
     unsupportedTableOperationError(table.name(), "either micro-batch or 
continuous scan")
   }
 
-  def unsupportedAppendInBatchModeError(table: Table): Throwable = {
-    unsupportedTableOperationError(table.name(), "append in batch mode")
+  def unsupportedAppendInBatchModeError(name: String): Throwable = {
+    unsupportedTableOperationError(name, "append in batch mode")
   }
 
   def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
@@ -3924,4 +3924,13 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       errorClass = "NESTED_EXECUTE_IMMEDIATE",
       messageParameters = Map("sqlString" -> toSQLStmt(queryString)))
   }
+
+  def dataSourceTableSchemaMismatchError(
+      tableSchema: StructType, actualSchema: StructType): Throwable = {
+    new AnalysisException(
+      errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
+      messageParameters = Map(
+        "tableSchema" -> toSQLType(tableSchema),
+        "actualSchema" -> toSQLType(actualSchema)))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
index b1a93addc80b..e332c6b8014a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
@@ -47,7 +47,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
       // TODO: check STREAMING_WRITE capability. It's not doable now because 
we don't have a
       //       a logical plan for streaming write.
       case AppendData(r: DataSourceV2Relation, _, _, _, _, _) if 
!supportsBatchWrite(r.table) =>
-        throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.table)
+        throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.name)
 
       case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _)
         if !r.table.supports(BATCH_WRITE) || 
!r.table.supports(OVERWRITE_DYNAMIC) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 5b1ff7c67b26..a7694f5d829d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, 
NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, 
ClusterBySpec, SessionCatalog}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.catalyst.util.TypeUtils._
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, 
Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, 
Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
 import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
@@ -36,7 +37,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.connector.V1Function
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.ArrayImplicits._
 
@@ -232,7 +233,21 @@ class V2SessionCatalog(catalog: SessionCatalog)
         throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
 
-    loadTable(ident)
+    val table = loadTable(ident)
+
+    // Check if the schema of the created table matches the given schema.
+    // TODO: move this check in loadTable to match the behavior with
+    // existing file data sources.
+    if (schema.nonEmpty) {
+      val tableSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
+        table.columns().asSchema)
+      if (!DataType.equalsIgnoreNullability(tableSchema, schema)) {
+        throw QueryCompilationErrors.dataSourceTableSchemaMismatchError(
+          table.columns().asSchema, schema)
+      }
+    }
+
+    table
   }
 
   private def toOptions(properties: Map[String, String]): Map[String, String] 
= {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index 835566238c9c..5d5ea6499c49 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -153,7 +153,7 @@ private [connector] trait SessionCatalogTest[T <: Table, 
Catalog <: TestV2Sessio
     spark.sessionState.catalogManager.catalog(name)
   }
 
-  protected val v2Format: String = classOf[FakeV2Provider].getName
+  protected val v2Format: String = 
classOf[FakeV2ProviderWithCustomSchema].getName
 
   protected val catalogClassName: String = 
classOf[InMemoryTableSessionCatalog].getName
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 47e79e45b737..589283a29b85 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -54,7 +54,7 @@ abstract class DataSourceV2SQLSuite
   with DeleteFromTests with DatasourceV2SQLBase with StatsEstimationTestBase
   with AdaptiveSparkPlanHelper {
 
-  protected val v2Source = classOf[FakeV2Provider].getName
+  protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
   override protected val v2Format = v2Source
 
   protected def doInsert(tableName: String, insert: DataFrame, mode: 
SaveMode): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 6e365e1d6059..3f3dc82da5ad 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -698,7 +698,7 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
   }
 
   test("SPARK-46043: create table in SQL with path option") {
-    val cls = classOf[SupportsExternalMetadataDataSource]
+    val cls = classOf[WritableDataSourceSupportsExternalMetadata]
     withTempDir { dir =>
       val path = s"${dir.getCanonicalPath}/test"
       Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path)
@@ -725,8 +725,141 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
     }
   }
 
+  test("SPARK-46272: create table - schema mismatch") {
+    withTable("test") {
+      val cls = classOf[WritableDataSourceSupportsExternalMetadata]
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName}")
+        },
+        errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
+        parameters = Map(
+          "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+          "actualSchema" -> "\"STRUCT<x: INT, y: INT>\""))
+    }
+  }
+
+  test("SPARK-46272: create table as select") {
+    val cls = classOf[WritableDataSourceSupportsExternalMetadata]
+    withTable("test") {
+      sql(
+        s"""
+           |CREATE TABLE test USING ${cls.getName}
+           |AS VALUES (0, 1), (1, 2) t(i, j)
+           |""".stripMargin)
+      checkAnswer(sql("SELECT * FROM test"), Seq((0, 1), (1, 2)).toDF("i", 
"j"))
+      sql(
+        s"""
+           |CREATE OR REPLACE TABLE test USING ${cls.getName}
+           |AS VALUES (2, 3), (4, 5) t(i, j)
+           |""".stripMargin)
+      checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i", 
"j"))
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS test USING ${cls.getName}
+           |AS VALUES (3, 4), (4, 5)
+           |""".stripMargin)
+      checkAnswer(sql("SELECT * FROM test"), Seq((2, 3), (4, 5)).toDF("i", 
"j"))
+    }
+  }
+
+  test("SPARK-46272: create table as select - schema name mismatch") {
+    val cls = classOf[WritableDataSourceSupportsExternalMetadata]
+    withTable("test") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1), (1, 
2)")
+        },
+        errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
+        parameters = Map(
+          "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+          "actualSchema" -> "\"STRUCT<col1: INT, col2: INT>\""))
+    }
+  }
+
+  test("SPARK-46272: create table as select - column type mismatch") {
+    val cls = classOf[WritableDataSourceSupportsExternalMetadata]
+    withTable("test") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(
+            s"""
+               |CREATE TABLE test USING ${cls.getName}
+               |AS VALUES ('a', 'b'), ('c', 'd') t(i, j)
+               |""".stripMargin)
+        },
+        errorClass = "DATA_SOURCE_TABLE_SCHEMA_MISMATCH",
+        parameters = Map(
+          "tableSchema" -> "\"STRUCT<i: INT, j: INT>\"",
+          "actualSchema" -> "\"STRUCT<i: STRING, j: STRING>\""))
+    }
+  }
+
+  test("SPARK-46272: create or replace table as select with path options") {
+    val cls = classOf[CustomSchemaAndPartitioningDataSource]
+    withTempDir { dir =>
+      val path = s"${dir.getCanonicalPath}/test"
+      Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path)
+      withTable("test") {
+        sql(
+          s"""
+             |CREATE TABLE test USING ${cls.getName}
+             |OPTIONS (PATH '$path')
+             |AS VALUES (0, 1)
+             |""".stripMargin)
+        checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), 
Row(1, 2)))
+        // Check the data currently in the path location.
+        checkAnswer(
+          spark.read.format("csv").load(path),
+          Seq(Row("0", "1"), Row("0", "1"), Row("1", "2")))
+        // Replace the table with new data.
+        sql(
+          s"""
+             |CREATE OR REPLACE TABLE test USING ${cls.getName}
+             |OPTIONS (PATH '$path')
+             |AS VALUES (2, 3)
+             |""".stripMargin)
+        checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(0, 1), 
Row(1, 2), Row(2, 3)))
+        // Replace the table without the path options.
+        sql(
+          s"""
+             |CREATE OR REPLACE TABLE test USING ${cls.getName}
+             |AS VALUES (3, 4)
+             |""".stripMargin)
+        checkAnswer(sql("SELECT * FROM test"), Seq(Row(3, 4)))
+      }
+    }
+  }
+
+  test("SPARK-46272: create table as select with incompatible data sources") {
+    // CTAS with data sources that do not support external metadata.
+    withTable("test") {
+      val cls = classOf[SimpleDataSourceV2]
+      checkError(
+        exception = intercept[SparkUnsupportedOperationException] {
+          sql(s"CREATE TABLE test USING ${cls.getName} AS VALUES (0, 1)")
+        },
+        errorClass = 
"CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+        parameters = Map(
+          "tableName" -> "`default`.`test`",
+          "provider" -> "org.apache.spark.sql.connector.SimpleDataSourceV2"))
+    }
+    // CTAS with data sources that do not support batch write.
+    withTable("test") {
+      val cls = classOf[SchemaRequiredDataSource]
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"CREATE TABLE test USING ${cls.getName} AS SELECT * FROM VALUES 
(0, 1)")
+        },
+        errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+        parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`test`",
+          "operation" -> "append in batch mode"))
+    }
+  }
+
   test("SPARK-46273: insert into") {
-    val cls = classOf[SupportsExternalMetadataDataSource]
+    val cls = classOf[CustomSchemaAndPartitioningDataSource]
     withTable("test") {
       sql(
         s"""
@@ -766,7 +899,7 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
   }
 
   test("SPARK-46273: insert overwrite") {
-    val cls = classOf[SupportsExternalMetadataDataSource]
+    val cls = classOf[CustomSchemaAndPartitioningDataSource]
     withTable("test") {
       sql(
         s"""
@@ -788,7 +921,7 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
   }
 
   test("SPARK-46273: insert into with partition") {
-    val cls = classOf[SupportsExternalMetadataDataSource]
+    val cls = classOf[CustomSchemaAndPartitioningDataSource]
     withTable("test") {
       sql(s"CREATE TABLE test(x INT, y INT) USING ${cls.getName} PARTITIONED 
BY (x, y)")
       sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
@@ -818,7 +951,7 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
   }
 
   test("SPARK-46273: insert overwrite with partition") {
-    val cls = classOf[SupportsExternalMetadataDataSource]
+    val cls = classOf[CustomSchemaAndPartitioningDataSource]
     withTable("test") {
       sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName} PARTITIONED 
BY (x, y)")
       sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
@@ -1334,9 +1467,18 @@ class SimpleWriteOnlyDataSource extends 
SimpleWritableDataSource {
   }
 }
 
-class SupportsExternalMetadataDataSource extends SimpleWritableDataSource {
+/**
+ * A writable data source that supports external metadata with a fixed schema 
(i int, j int).
+ */
+class WritableDataSourceSupportsExternalMetadata extends 
SimpleWritableDataSource {
   override def supportsExternalMetadata(): Boolean = true
+}
 
+/**
+ * A writable data source that supports external metadata with
+ * user-specified schema and partitioning.
+ */
+class CustomSchemaAndPartitioningDataSource extends 
WritableDataSourceSupportsExternalMetadata {
   class TestTable(
       schema: StructType,
       partitioning: Array[Transform],
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
index 174700e8d24f..25d2d5a67d44 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
@@ -61,3 +61,25 @@ class FakeV2Provider extends TableProvider {
 object FakeV2Provider {
   val schema: StructType = new StructType().add("i", "int").add("j", "int")
 }
+
+class FakeV2ProviderWithCustomSchema extends FakeV2Provider {
+  class FakeTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      options: CaseInsensitiveStringMap) extends SimpleBatchTable {
+    override def schema(): StructType = schema
+
+    override def partitioning(): Array[Transform] = partitioning
+
+    override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+      new MyScanBuilder()
+    }
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    new FakeTable(schema, partitioning, new 
CaseInsensitiveStringMap(properties))
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index fd4f719417e4..b952270fc786 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -380,7 +380,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with 
SharedSparkSession with
 }
 
 class CatalogSupportingInMemoryTableProvider
-  extends FakeV2Provider
+  extends FakeV2ProviderWithCustomSchema
   with SupportsCatalogOptions {
 
   override def extractIdentifier(options: CaseInsensitiveStringMap): 
Identifier = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
index 181dc0ea2074..ad31cf84eeb3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
@@ -249,7 +249,7 @@ private object InMemoryV1Provider {
 }
 
 class InMemoryV1Provider
-  extends FakeV2Provider
+  extends FakeV2ProviderWithCustomSchema
   with DataSourceRegister
   with CreatableRelationProvider {
   override def getTable(options: CaseInsensitiveStringMap): Table = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index 7bf81fb98655..5a4f386f1d1d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
-import org.apache.spark.sql.connector.{FakeV2Provider, 
InMemoryTableSessionCatalog}
+import org.apache.spark.sql.connector.{FakeV2Provider, 
FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, 
Table, TableCapability, V2TableWithV1Fallback}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.ScanBuilder
@@ -204,7 +204,7 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
   }
 
   test("write: write to table with default session catalog") {
-    val v2Source = classOf[FakeV2Provider].getName
+    val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
     spark.conf.set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
       classOf[InMemoryTableSessionCatalog].getName)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to