This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fec76dc8db2 [SPARK-46043][SQL] Support create table using DSv2 sources
5fec76dc8db2 is described below

commit 5fec76dc8db2499b0a9d76231f9a250871d59658
Author: allisonwang-db <[email protected]>
AuthorDate: Tue Dec 5 09:35:14 2023 +0800

    [SPARK-46043][SQL] Support create table using DSv2 sources
    
    ### What changes were proposed in this pull request?
    
    This PR supports `CREATE TABLE ... USING source` for DSv2 sources.
    
    ### Why are the changes needed?
    
    To support creating DSv2 tables in SQL. Currently the table create can work 
but when you select a dsv2 table created in SQL, it fails with this error:
    ```
    org.apache.spark.sql.AnalysisException: 
org.apache.spark.sql.connector.SimpleDataSourceV2 is not a valid Spark SQL Data 
Source.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43949 from allisonwang-db/spark-46043-dsv2-create-table.
    
    Authored-by: allisonwang-db <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-classes.json    |  13 +++
 ...-cannot-create-data-source-table-error-class.md |  32 ++++++
 docs/sql-error-conditions.md                       |   8 ++
 .../spark/sql/catalyst/util/QuotingUtils.scala     |   8 ++
 .../sql/connector/catalog/CatalogV2Implicits.scala |   6 ++
 .../datasources/v2/DataSourceV2Relation.scala      |  19 +++-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  14 +--
 .../datasources/v2/DataSourceV2Utils.scala         |  16 +++
 .../datasources/v2/V2SessionCatalog.scala          | 104 +++++++++++++++++---
 .../connector/JavaSchemaRequiredDataSource.java    |   4 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  15 +--
 .../spark/sql/connector/DataSourceV2Suite.scala    | 108 ++++++++++++++++++++-
 .../spark/sql/connector/FakeV2Provider.scala       |  63 ++++++++++++
 .../spark/sql/connector/InsertIntoTests.scala      |  12 ++-
 .../sql/connector/TestV2SessionCatalogBase.scala   |   5 +-
 .../spark/sql/connector/V1WriteFallbackSuite.scala |   3 +-
 .../execution/command/PlanResolutionSuite.scala    |   2 +-
 17 files changed, 384 insertions(+), 48 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index a808be9510cf..e54d346e1bc1 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -149,6 +149,19 @@
     ],
     "sqlState" : "42846"
   },
+  "CANNOT_CREATE_DATA_SOURCE_TABLE" : {
+    "message" : [
+      "Failed to create data source table <tableName>:"
+    ],
+    "subClass" : {
+      "EXTERNAL_METADATA_UNSUPPORTED" : {
+        "message" : [
+          "provider '<provider>' does not support external metadata but a 
schema is provided. Please remove the schema when creating the table."
+        ]
+      }
+    },
+    "sqlState" : "42KDE"
+  },
   "CANNOT_DECODE_URL" : {
     "message" : [
       "The provided URL cannot be decoded: <url>. Please ensure that the URL 
is properly formatted and try again."
diff --git 
a/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md 
b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md
new file mode 100644
index 000000000000..f52e4f462bc9
--- /dev/null
+++ b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md
@@ -0,0 +1,32 @@
+---
+layout: global
+title: CANNOT_CREATE_DATA_SOURCE_TABLE error class
+displayTitle: CANNOT_CREATE_DATA_SOURCE_TABLE error class
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+[SQLSTATE: 
42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to create data source table `<tableName>`:
+
+This error class has the following derived error classes:
+
+## EXTERNAL_METADATA_UNSUPPORTED
+
+provider '`<provider>`' does not support external metadata but a schema is 
provided. Please remove the schema when creating the table.
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 8074ba9233d0..c9990d3856cd 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -163,6 +163,14 @@ Cannot convert SQL `<sqlColumn>` to Protobuf 
`<protobufColumn>` because schema i
 
 Cannot convert SQL `<sqlColumn>` to Protobuf `<protobufColumn>` because 
`<data>` is not in defined values for enum: `<enumString>`.
 
+### 
[CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html)
+
+[SQLSTATE: 
42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to create data source table `<tableName>`:
+
+For more details see 
[CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html)
+
 ### CANNOT_DECODE_URL
 
 [SQLSTATE: 22546](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala
index 43af533b85a4..97d110f7ffc5 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala
@@ -61,6 +61,14 @@ object QuotingUtils {
     }
   }
 
+  def fullyQuoted(ident: Identifier): String = {
+    if (ident.namespace.nonEmpty) {
+      ident.namespace.map(quoteIdentifier).mkString(".") + "." + 
quoteIdentifier(ident.name)
+    } else {
+      quoteIdentifier(ident.name)
+    }
+  }
+
   def escapeSingleQuotedString(str: String): String = {
     val builder = new StringBuilder
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 133011ad9fac..2b712241633b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -144,10 +144,16 @@ private[sql] object CatalogV2Implicits {
   }
 
   implicit class IdentifierHelper(ident: Identifier) {
+    /* Quote the identifier if needed. */
     def quoted: String = {
       QuotingUtils.quoted(ident)
     }
 
+    /* Always quote the identifier. */
+    def fullyQuoted: String = {
+      QuotingUtils.fullyQuoted(ident)
+    }
+
     def original: String = ident.namespace() :+ ident.name() mkString "."
 
     def asMultipartIdentifier: Seq[String] = (ident.namespace :+ 
ident.name).toImmutableArraySeq
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 92638a152871..573b0274e958 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Expression, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, 
ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
-import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
+import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, truncatedString, 
CharVarcharUtils}
 import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, 
Identifier, SupportsMetadataColumns, Table, TableCapability}
 import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, 
SupportsReportStatistics}
 import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -61,7 +62,19 @@ case class DataSourceV2Relation(
       Nil
   }
 
-  override def name: String = table.name()
+  override def name: String = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+    (catalog, identifier) match {
+      case (Some(cat), Some(ident)) => 
s"${quoteIfNeeded(cat.name())}.${ident.quoted}"
+      case (None, None) => table.name()
+      case _ =>
+        throw SparkException.internalError(
+          "Invalid catalog and identifier pair. Both 'catalog' and 
'identifier' must be " +
+            s"specified or leave as None. Current input - " +
+            s"catalog: '${catalog.map(_.name()).getOrElse(None)}', " +
+            s"identifier: ${identifier.map(_.quoted).getOrElse(None)}.")
+    }
+  }
 
   override def skipSchemaResolution: Boolean = 
table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
 
@@ -127,7 +140,7 @@ case class DataSourceV2ScanRelation(
     keyGroupedPartitioning: Option[Seq[Expression]] = None,
     ordering: Option[Seq[SortOrder]] = None) extends LeafNode with 
NamedRelation {
 
-  override def name: String = relation.table.name()
+  override def name: String = relation.name
 
   override def simpleString(maxFields: Int): String = {
     s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index b4c549a90191..d44de0b260b2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -31,8 +31,8 @@ import 
org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Lo
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTable => 
CreateTableV1, DataSource}
-import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.execution.datasources.{CreateTable => 
CreateTableV1}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
@@ -612,15 +612,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
   }
 
   private def isV2Provider(provider: String): Boolean = {
-    // Return earlier since `lookupDataSourceV2` may fail to resolve provider 
"hive" to
-    // `HiveFileFormat`, when running tests in sql/core.
-    if (DDLUtils.isHiveTable(Some(provider))) return false
-    DataSource.lookupDataSourceV2(provider, conf) match {
-      // TODO(SPARK-28396): Currently file source v2 can't work with tables.
-      case Some(_: FileDataSourceV2) => false
-      case Some(_) => true
-      case _ => false
-    }
+    DataSourceV2Utils.getTableProvider(provider, conf).isDefined
   }
 
   private object DatabaseInSessionCatalog {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index ef8e4605f472..9ffa0d728ca2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
SessionConfigSupport, SupportsCatalogOptions, SupportsRead, Table, 
TableProvider}
 import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{LongType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -151,6 +153,20 @@ private[sql] object DataSourceV2Utils extends Logging {
     }
   }
 
+  /**
+   * Returns the table provider for the given format, or None if it cannot be 
found.
+   */
+  def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider] 
= {
+    // Return earlier since `lookupDataSourceV2` may fail to resolve provider 
"hive" to
+    // `HiveFileFormat`, when running tests in sql/core.
+    if (DDLUtils.isHiveTable(Some(provider))) return None
+    DataSource.lookupDataSourceV2(provider, conf) match {
+      // TODO(SPARK-28396): Currently file source v2 can't work with tables.
+      case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p)
+      case _ => None
+    }
+  }
+
   private lazy val objectMapper = new ObjectMapper()
   def getOptionsWithPaths(
       extraOptions: CaseInsensitiveMap[String],
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 5a8f2e2d8a40..5b1ff7c67b26 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -23,9 +23,10 @@ import java.util
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, 
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, 
CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, 
ClusterBySpec, SessionCatalog}
 import org.apache.spark.sql.catalyst.util.TypeUtils._
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, 
Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, 
Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
 import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
@@ -71,9 +72,44 @@ class V2SessionCatalog(catalog: SessionCatalog)
     }
   }
 
+  // Get data source options from the catalog table properties with the path 
option.
+  private def getDataSourceOptions(
+      properties: Map[String, String],
+      storage: CatalogStorageFormat): CaseInsensitiveStringMap = {
+    val propertiesWithPath = properties ++
+      storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
+    new CaseInsensitiveStringMap(propertiesWithPath.asJava)
+  }
+
   override def loadTable(ident: Identifier): Table = {
     try {
-      V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
+      val table = catalog.getTableMetadata(ident.asTableIdentifier)
+      if (table.provider.isDefined) {
+        DataSourceV2Utils.getTableProvider(table.provider.get, conf) match {
+          case Some(provider) =>
+            // Get the table properties during creation and append the path 
option
+            // to the properties.
+            val dsOptions = getDataSourceOptions(table.properties, 
table.storage)
+            // If the source accepts external table metadata, we can pass the 
schema and
+            // partitioning information stored in Hive to `getTable` to avoid 
expensive
+            // schema/partitioning inference.
+            if (provider.supportsExternalMetadata()) {
+              provider.getTable(
+                table.schema,
+                getV2Partitioning(table),
+                dsOptions.asCaseSensitiveMap())
+            } else {
+              provider.getTable(
+                provider.inferSchema(dsOptions),
+                provider.inferPartitioning(dsOptions),
+                dsOptions.asCaseSensitiveMap())
+            }
+          case _ =>
+            V1Table(table)
+        }
+      } else {
+        V1Table(table)
+      }
     } catch {
       case _: NoSuchDatabaseException =>
         throw QueryCompilationErrors.noSuchTableError(ident)
@@ -96,6 +132,16 @@ class V2SessionCatalog(catalog: SessionCatalog)
     throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(nameParts))
   }
 
+  private def getV2Partitioning(table: CatalogTable): Array[Transform] = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+    val v2Partitioning = table.partitionColumnNames.asTransforms
+    val v2Bucketing = table.bucketSpec.map(
+      spec => Array(spec.asTransform)).getOrElse(Array.empty)
+    val v2Clustering = table.clusterBySpec.map(
+      spec => Array(spec.asTransform)).getOrElse(Array.empty)
+    v2Partitioning ++ v2Bucketing ++ v2Clustering
+  }
+
   override def invalidateTable(ident: Identifier): Unit = {
     catalog.refreshTable(ident.asTableIdentifier)
   }
@@ -114,14 +160,12 @@ class V2SessionCatalog(catalog: SessionCatalog)
       schema: StructType,
       partitions: Array[Transform],
       properties: util.Map[String, String]): Table = {
-    import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
-      partitions.toImmutableArraySeq.convertTransforms
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
-    val tableProperties = properties.asScala
+    val tableProperties = properties.asScala.toMap
     val location = Option(properties.get(TableCatalog.PROP_LOCATION))
-    val storage = 
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
-        .copy(locationUri = location.map(CatalogUtils.stringToURI))
+    val storage = 
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties))
+      .copy(locationUri = location.map(CatalogUtils.stringToURI))
     val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
     val tableType = if (isExternal || location.isDefined) {
       CatalogTableType.EXTERNAL
@@ -129,17 +173,55 @@ class V2SessionCatalog(catalog: SessionCatalog)
       CatalogTableType.MANAGED
     }
 
+    val (newSchema, newPartitions) = 
DataSourceV2Utils.getTableProvider(provider, conf) match {
+      // If the provider does not support external metadata, users should not 
be allowed to
+      // specify custom schema when creating the data source table, since the 
schema will not
+      // be used when loading the table.
+      case Some(p) if !p.supportsExternalMetadata() =>
+        if (schema.nonEmpty) {
+          throw new SparkUnsupportedOperationException(
+            errorClass = 
"CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+            messageParameters = Map("tableName" -> ident.fullyQuoted, 
"provider" -> provider))
+        }
+        // V2CreateTablePlan does not allow non-empty partitions when schema 
is empty. This
+        // is checked in `PreProcessTableCreation` rule.
+        assert(partitions.isEmpty,
+          s"Partitions should be empty when the schema is empty: 
${partitions.mkString(", ")}")
+        (schema, partitions)
+
+      case Some(tableProvider) =>
+        assert(tableProvider.supportsExternalMetadata())
+        lazy val dsOptions = getDataSourceOptions(tableProperties, storage)
+        if (schema.isEmpty) {
+          assert(partitions.isEmpty,
+            s"Partitions should be empty when the schema is empty: 
${partitions.mkString(", ")}")
+          // Infer the schema and partitions and store them in the catalog.
+          (tableProvider.inferSchema(dsOptions), 
tableProvider.inferPartitioning(dsOptions))
+        } else if (partitions.isEmpty) {
+          (schema, tableProvider.inferPartitioning(dsOptions))
+        } else {
+          (schema, partitions)
+        }
+
+      case _ =>
+        // The provider is not a V2 provider so we return the schema and 
partitions as is.
+        (schema, partitions)
+    }
+
+    val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
+      newPartitions.toImmutableArraySeq.convertTransforms
+
     val tableDesc = CatalogTable(
       identifier = ident.asTableIdentifier,
       tableType = tableType,
       storage = storage,
-      schema = schema,
+      schema = newSchema,
       provider = Some(provider),
       partitionColumnNames = partitionColumns,
       bucketSpec = maybeBucketSpec,
-      properties = tableProperties.toMap ++
+      properties = tableProperties ++
         maybeClusterBySpec.map(
-          clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec, 
conf.resolver)),
+          clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec, 
conf.resolver)),
       tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
       comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
 
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java
index e7d728fc5a08..91b6621e7767 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java
@@ -44,7 +44,9 @@ public class JavaSchemaRequiredDataSource implements 
TableProvider {
 
     @Override
     public InputPartition[] planInputPartitions() {
-      return new InputPartition[0];
+      InputPartition[] partitions = new InputPartition[1];
+      partitions[0] = new JavaRangeInputPartition(0, 2);
+      return partitions;
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index b92b512aa1d3..2b93e8bd3200 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -45,10 +45,8 @@ import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, 
PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
 import org.apache.spark.sql.sources.SimpleScanSource
 import org.apache.spark.sql.types.{LongType, StringType, StructType}
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.unsafe.types.UTF8String
 
 abstract class DataSourceV2SQLSuite
@@ -1120,7 +1118,7 @@ class DataSourceV2SQLSuiteV1Filter
         },
         errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
         parameters = Map(
-          "tableName" -> "`default`.`tbl`",
+          "tableName" -> "`spark_catalog`.`default`.`tbl`",
           "tableColumns" -> "`id`, `data`",
           "dataColumns" -> "`col1`"
         )
@@ -1155,7 +1153,7 @@ class DataSourceV2SQLSuiteV1Filter
         },
         errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
         parameters = Map(
-          "tableName" -> "`default`.`tbl`",
+          "tableName" -> "`spark_catalog`.`default`.`tbl`",
           "tableColumns" -> "`id`, `data`",
           "dataColumns" -> "`col1`"
         )
@@ -1191,7 +1189,7 @@ class DataSourceV2SQLSuiteV1Filter
         },
         errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
         parameters = Map(
-          "tableName" -> "`default`.`tbl`",
+          "tableName" -> "`spark_catalog`.`default`.`tbl`",
           "tableColumns" -> "`id`, `data`, `data2`",
           "dataColumns" -> "`col1`, `col2`"
         )
@@ -3390,13 +3388,6 @@ class DataSourceV2SQLSuiteV2Filter extends 
DataSourceV2SQLSuite {
   override protected val catalogAndNamespace = "testv2filter.ns1.ns2."
 }
 
-/** Used as a V2 DataSource for V2SessionCatalog DDL */
-class FakeV2Provider extends SimpleTableProvider {
-  override def getTable(options: CaseInsensitiveStringMap): Table = {
-    throw new UnsupportedOperationException("Unnecessary for DDL tests")
-  }
-}
-
 class ReserveSchemaNullabilityCatalog extends InMemoryCatalog {
   override def useNullableQuerySchema(): Boolean = false
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index e61f8cb0bf06..f2e518e8acc4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -22,6 +22,7 @@ import java.util.OptionalLong
 
 import test.org.apache.spark.sql.connector._
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog.{PartitionInternalRow, 
SupportsRead, Table, TableCapability, TableProvider}
@@ -231,11 +232,11 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
         val e = 
intercept[IllegalArgumentException](spark.read.format(cls.getName).load())
         assert(e.getMessage.contains("requires a user-supplied schema"))
 
-        val schema = new StructType().add("i", "int").add("s", "string")
+        val schema = new StructType().add("i", "int").add("j", "int")
         val df = spark.read.format(cls.getName).schema(schema).load()
 
         assert(df.schema == schema)
-        assert(df.collect().isEmpty)
+        checkAnswer(df, Seq(Row(0, 0), Row(1, -1)))
       }
     }
   }
@@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
       }
     }
   }
+
+  test("SPARK-46043: create table in SQL using a DSv2 source") {
+    Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach 
{ cls =>
+      withClue(cls.getName) {
+        // Create a table with empty schema.
+        withTable("test") {
+          sql(s"CREATE TABLE test USING ${cls.getName}")
+          checkAnswer(
+            sql(s"SELECT * FROM test WHERE i < 3"),
+            Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+        }
+        // Create a table with non-empty schema is not allowed.
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException] {
+            sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}")
+          },
+          errorClass = 
"CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+          parameters = Map("tableName" -> "`default`.`test`", "provider" -> 
cls.getName)
+        )
+      }
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with schema required data source") {
+    val cls = classOf[SchemaRequiredDataSource]
+    val e = intercept[IllegalArgumentException] {
+      sql(s"CREATE TABLE test USING ${cls.getName}")
+    }
+    assert(e.getMessage.contains("requires a user-supplied schema"))
+    withTable("test") {
+      sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName}")
+      checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1)))
+    }
+    withTable("test") {
+      sql(s"CREATE TABLE test(i INT) USING ${cls.getName}")
+      checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0), Row(1)))
+    }
+    withTable("test") {
+      // Test the behavior when there is a mismatch between the schema defined 
in the
+      // CREATE TABLE command and the actual schema produced by the data 
source. The
+      // resulting behavior is not guaranteed and may vary based on the data 
source's
+      // implementation.
+      sql(s"CREATE TABLE test(i INT, j INT, k INT) USING ${cls.getName}")
+      val e = intercept[Exception] {
+        sql("SELECT * FROM test").collect()
+      }
+      assert(e.getMessage.contains(
+        "java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for 
length 2"))
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with partitioning required data 
source") {
+    val cls = classOf[PartitionsRequiredDataSource]
+    val e = intercept[IllegalArgumentException](
+      sql(s"CREATE TABLE test(a INT) USING ${cls.getName}"))
+    assert(e.getMessage.contains("user-supplied partitioning"))
+    withTable("test") {
+      sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName} PARTITIONED 
BY (i)")
+      checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1)))
+    }
+  }
+
+  test("SPARK-46043: create table in SQL with path option") {
+    val cls = classOf[SupportsExternalMetadataDataSource]
+    withTempDir { dir =>
+      val path = s"${dir.getCanonicalPath}/test"
+      Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path)
+      withTable("test") {
+        sql(
+          s"""
+             |CREATE TABLE test USING ${cls.getName}
+             |OPTIONS (PATH '$path')
+             |""".stripMargin)
+        checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2)))
+        sql(
+          s"""
+             |CREATE OR REPLACE TABLE test USING ${cls.getName}
+             |OPTIONS (PATH '${dir.getCanonicalPath}/non-existing')
+             |""".stripMargin)
+        checkAnswer(sql("SELECT * FROM test"), Nil)
+        sql(
+          s"""
+             |CREATE OR REPLACE TABLE test USING ${cls.getName}
+             |LOCATION '$path'
+             |""".stripMargin)
+        checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2)))
+      }
+    }
+  }
 }
 
 
@@ -910,7 +1000,9 @@ class AdvancedReaderFactory(requiredSchema: StructType) 
extends PartitionReaderF
 class SchemaRequiredDataSource extends TableProvider {
 
   class MyScanBuilder(schema: StructType) extends SimpleScanBuilder {
-    override def planInputPartitions(): Array[InputPartition] = Array.empty
+    override def planInputPartitions(): Array[InputPartition] = {
+      Array(RangeInputPartition(0, 2))
+    }
 
     override def readSchema(): StructType = schema
   }
@@ -936,6 +1028,12 @@ class SchemaRequiredDataSource extends TableProvider {
   }
 }
 
+class PartitionsRequiredDataSource extends SchemaRequiredDataSource {
+  override def inferPartitioning(options: CaseInsensitiveStringMap): 
Array[Transform] = {
+    throw new IllegalArgumentException("requires user-supplied partitioning")
+  }
+}
+
 class ColumnarDataSourceV2 extends TestingV2Source {
 
   class MyScanBuilder extends SimpleScanBuilder {
@@ -1127,6 +1225,10 @@ class SimpleWriteOnlyDataSource extends 
SimpleWritableDataSource {
   }
 }
 
+class SupportsExternalMetadataDataSource extends SimpleWritableDataSource {
+  override def supportsExternalMetadata(): Boolean = true
+}
+
 class SupportsExternalMetadataWritableDataSource extends 
SimpleWritableDataSource {
   override def supportsExternalMetadata(): Boolean = true
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
new file mode 100644
index 000000000000..174700e8d24f
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import java.util
+
+import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** Used as a V2 DataSource for V2SessionCatalog DDL */
+class FakeV2Provider extends TableProvider {
+
+  // Supports external metadata so that users can specify schema and partitions
+  // when creating the table.
+  override def supportsExternalMetadata(): Boolean = true
+
+  class MyScanBuilder extends SimpleScanBuilder {
+    override def planInputPartitions(): Array[InputPartition] = {
+      Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
+    }
+  }
+
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    FakeV2Provider.schema
+  }
+
+  def getTable(options: CaseInsensitiveStringMap): Table = {
+    new SimpleBatchTable {
+      override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+        new MyScanBuilder()
+      }
+    }
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    getTable(new CaseInsensitiveStringMap(properties))
+  }
+}
+
+object FakeV2Provider {
+  val schema: StructType = new StructType().add("i", "int").add("j", "int")
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index 63bb0148972c..fa30969d65c5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -126,7 +126,11 @@ abstract class InsertIntoTests(
     val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
 
     verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", 
"missing"))
-    val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") 
else toSQLId(t1)
+    val tableName = if (catalogAndNamespace.isEmpty) {
+      toSQLId(s"spark_catalog.default.$t1")
+    } else {
+      toSQLId(t1)
+    }
     checkError(
       exception = intercept[AnalysisException] {
         doInsert(t1, df)
@@ -145,7 +149,11 @@ abstract class InsertIntoTests(
       sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
       val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit")
       verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
-      val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") 
else toSQLId(t1)
+      val tableName = if (catalogAndNamespace.isEmpty) {
+        toSQLId(s"spark_catalog.default.$t1")
+      } else {
+        toSQLId(t1)
+      }
       checkError(
         exception = intercept[AnalysisException] {
           doInsert(t1, df)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index 1396ef82925a..719b006c1460 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -78,6 +78,7 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] 
extends Delegating
       schema: StructType,
       partitions: Array[Transform],
       properties: java.util.Map[String, String]): Table = {
+    import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
     val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY
     val propsWithLocation = if (properties.containsKey(key)) {
       // Always set a location so that CREATE EXTERNAL TABLE won't fail with 
LOCATION not specified.
@@ -92,8 +93,8 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] 
extends Delegating
     } else {
       properties
     }
-    val created = super.createTable(ident, schema, partitions, 
propsWithLocation)
-    val t = newTable(created.name(), schema, partitions, propsWithLocation)
+    super.createTable(ident, schema, partitions, propsWithLocation)
+    val t = newTable(ident.quoted, schema, partitions, propsWithLocation)
     addTable(ident, t)
     t
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
index ec62739b9cf2..181dc0ea2074 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
@@ -36,7 +36,6 @@ import 
org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoI
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.internal.SQLConf.{OPTIMIZER_MAX_ITERATIONS, 
V2_SESSION_CATALOG_IMPLEMENTATION}
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
@@ -250,7 +249,7 @@ private object InMemoryV1Provider {
 }
 
 class InMemoryV1Provider
-  extends SimpleTableProvider
+  extends FakeV2Provider
   with DataSourceRegister
   with CreatableRelationProvider {
   override def getTable(options: CaseInsensitiveStringMap): Table = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index e39cc91d5f04..69b3285fc7f1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1275,7 +1275,7 @@ class PlanResolutionSuite extends AnalysisTest {
       },
       errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
       parameters = Map(
-        "tableName" -> "`tab2`",
+        "tableName" -> "`testcat`.`tab2`",
         "tableColumns" -> "`i`, `x`",
         "dataColumns" -> "`col1`")
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to