This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5fec76dc8db2 [SPARK-46043][SQL] Support create table using DSv2 sources
5fec76dc8db2 is described below
commit 5fec76dc8db2499b0a9d76231f9a250871d59658
Author: allisonwang-db <[email protected]>
AuthorDate: Tue Dec 5 09:35:14 2023 +0800
[SPARK-46043][SQL] Support create table using DSv2 sources
### What changes were proposed in this pull request?
This PR supports `CREATE TABLE ... USING source` for DSv2 sources.
### Why are the changes needed?
To support creating DSv2 tables in SQL. Currently the table create can work
but when you select a dsv2 table created in SQL, it fails with this error:
```
org.apache.spark.sql.AnalysisException:
org.apache.spark.sql.connector.SimpleDataSourceV2 is not a valid Spark SQL Data
Source.
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43949 from allisonwang-db/spark-46043-dsv2-create-table.
Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-classes.json | 13 +++
...-cannot-create-data-source-table-error-class.md | 32 ++++++
docs/sql-error-conditions.md | 8 ++
.../spark/sql/catalyst/util/QuotingUtils.scala | 8 ++
.../sql/connector/catalog/CatalogV2Implicits.scala | 6 ++
.../datasources/v2/DataSourceV2Relation.scala | 19 +++-
.../catalyst/analysis/ResolveSessionCatalog.scala | 14 +--
.../datasources/v2/DataSourceV2Utils.scala | 16 +++
.../datasources/v2/V2SessionCatalog.scala | 104 +++++++++++++++++---
.../connector/JavaSchemaRequiredDataSource.java | 4 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 15 +--
.../spark/sql/connector/DataSourceV2Suite.scala | 108 ++++++++++++++++++++-
.../spark/sql/connector/FakeV2Provider.scala | 63 ++++++++++++
.../spark/sql/connector/InsertIntoTests.scala | 12 ++-
.../sql/connector/TestV2SessionCatalogBase.scala | 5 +-
.../spark/sql/connector/V1WriteFallbackSuite.scala | 3 +-
.../execution/command/PlanResolutionSuite.scala | 2 +-
17 files changed, 384 insertions(+), 48 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index a808be9510cf..e54d346e1bc1 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -149,6 +149,19 @@
],
"sqlState" : "42846"
},
+ "CANNOT_CREATE_DATA_SOURCE_TABLE" : {
+ "message" : [
+ "Failed to create data source table <tableName>:"
+ ],
+ "subClass" : {
+ "EXTERNAL_METADATA_UNSUPPORTED" : {
+ "message" : [
+ "provider '<provider>' does not support external metadata but a
schema is provided. Please remove the schema when creating the table."
+ ]
+ }
+ },
+ "sqlState" : "42KDE"
+ },
"CANNOT_DECODE_URL" : {
"message" : [
"The provided URL cannot be decoded: <url>. Please ensure that the URL
is properly formatted and try again."
diff --git
a/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md
b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md
new file mode 100644
index 000000000000..f52e4f462bc9
--- /dev/null
+++ b/docs/sql-error-conditions-cannot-create-data-source-table-error-class.md
@@ -0,0 +1,32 @@
+---
+layout: global
+title: CANNOT_CREATE_DATA_SOURCE_TABLE error class
+displayTitle: CANNOT_CREATE_DATA_SOURCE_TABLE error class
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+[SQLSTATE:
42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to create data source table `<tableName>`:
+
+This error class has the following derived error classes:
+
+## EXTERNAL_METADATA_UNSUPPORTED
+
+provider '`<provider>`' does not support external metadata but a schema is
provided. Please remove the schema when creating the table.
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 8074ba9233d0..c9990d3856cd 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -163,6 +163,14 @@ Cannot convert SQL `<sqlColumn>` to Protobuf
`<protobufColumn>` because schema i
Cannot convert SQL `<sqlColumn>` to Protobuf `<protobufColumn>` because
`<data>` is not in defined values for enum: `<enumString>`.
+###
[CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html)
+
+[SQLSTATE:
42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to create data source table `<tableName>`:
+
+For more details see
[CANNOT_CREATE_DATA_SOURCE_TABLE](sql-error-conditions-cannot-create-data-source-table-error-class.html)
+
### CANNOT_DECODE_URL
[SQLSTATE: 22546](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala
index 43af533b85a4..97d110f7ffc5 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/QuotingUtils.scala
@@ -61,6 +61,14 @@ object QuotingUtils {
}
}
+ def fullyQuoted(ident: Identifier): String = {
+ if (ident.namespace.nonEmpty) {
+ ident.namespace.map(quoteIdentifier).mkString(".") + "." +
quoteIdentifier(ident.name)
+ } else {
+ quoteIdentifier(ident.name)
+ }
+ }
+
def escapeSingleQuotedString(str: String): String = {
val builder = new StringBuilder
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 133011ad9fac..2b712241633b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -144,10 +144,16 @@ private[sql] object CatalogV2Implicits {
}
implicit class IdentifierHelper(ident: Identifier) {
+ /* Quote the identifier if needed. */
def quoted: String = {
QuotingUtils.quoted(ident)
}
+ /* Always quote the identifier. */
+ def fullyQuoted: String = {
+ QuotingUtils.fullyQuoted(ident)
+ }
+
def original: String = ident.namespace() :+ ident.name() mkString "."
def asMultipartIdentifier: Seq[String] = (ident.namespace :+
ident.name).toImmutableArraySeq
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 92638a152871..573b0274e958 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -17,11 +17,12 @@
package org.apache.spark.sql.execution.datasources.v2
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation,
NamedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeReference, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat,
ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan,
Statistics}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
-import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
+import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, truncatedString,
CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog,
Identifier, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics,
SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -61,7 +62,19 @@ case class DataSourceV2Relation(
Nil
}
- override def name: String = table.name()
+ override def name: String = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+ (catalog, identifier) match {
+ case (Some(cat), Some(ident)) =>
s"${quoteIfNeeded(cat.name())}.${ident.quoted}"
+ case (None, None) => table.name()
+ case _ =>
+ throw SparkException.internalError(
+ "Invalid catalog and identifier pair. Both 'catalog' and
'identifier' must be " +
+ s"specified or leave as None. Current input - " +
+ s"catalog: '${catalog.map(_.name()).getOrElse(None)}', " +
+ s"identifier: ${identifier.map(_.quoted).getOrElse(None)}.")
+ }
+ }
override def skipSchemaResolution: Boolean =
table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
@@ -127,7 +140,7 @@ case class DataSourceV2ScanRelation(
keyGroupedPartitioning: Option[Seq[Expression]] = None,
ordering: Option[Seq[SortOrder]] = None) extends LeafNode with
NamedRelation {
- override def name: String = relation.table.name()
+ override def name: String = relation.name
override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index b4c549a90191..d44de0b260b2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -31,8 +31,8 @@ import
org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Lo
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTable =>
CreateTableV1, DataSource}
-import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.execution.datasources.{CreateTable =>
CreateTableV1}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
@@ -612,15 +612,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
}
private def isV2Provider(provider: String): Boolean = {
- // Return earlier since `lookupDataSourceV2` may fail to resolve provider
"hive" to
- // `HiveFileFormat`, when running tests in sql/core.
- if (DDLUtils.isHiveTable(Some(provider))) return false
- DataSource.lookupDataSourceV2(provider, conf) match {
- // TODO(SPARK-28396): Currently file source v2 can't work with tables.
- case Some(_: FileDataSourceV2) => false
- case Some(_) => true
- case _ => false
- }
+ DataSourceV2Utils.getTableProvider(provider, conf).isDefined
}
private object DatabaseInSessionCatalog {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index ef8e4605f472..9ffa0d728ca2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
SessionConfigSupport, SupportsCatalogOptions, SupportsRead, Table,
TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -151,6 +153,20 @@ private[sql] object DataSourceV2Utils extends Logging {
}
}
+ /**
+ * Returns the table provider for the given format, or None if it cannot be
found.
+ */
+ def getTableProvider(provider: String, conf: SQLConf): Option[TableProvider]
= {
+ // Return earlier since `lookupDataSourceV2` may fail to resolve provider
"hive" to
+ // `HiveFileFormat`, when running tests in sql/core.
+ if (DDLUtils.isHiveTable(Some(provider))) return None
+ DataSource.lookupDataSourceV2(provider, conf) match {
+ // TODO(SPARK-28396): Currently file source v2 can't work with tables.
+ case Some(p) if !p.isInstanceOf[FileDataSourceV2] => Some(p)
+ case _ => None
+ }
+ }
+
private lazy val objectMapper = new ObjectMapper()
def getOptionsWithPaths(
extraOptions: CaseInsensitiveMap[String],
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 5a8f2e2d8a40..5b1ff7c67b26 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -23,9 +23,10 @@ import java.util
import scala.collection.mutable
import scala.jdk.CollectionConverters._
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException,
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable,
CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase,
CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils,
ClusterBySpec, SessionCatalog}
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util,
Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces,
Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
@@ -71,9 +72,44 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
}
+ // Get data source options from the catalog table properties with the path
option.
+ private def getDataSourceOptions(
+ properties: Map[String, String],
+ storage: CatalogStorageFormat): CaseInsensitiveStringMap = {
+ val propertiesWithPath = properties ++
+ storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
+ new CaseInsensitiveStringMap(propertiesWithPath.asJava)
+ }
+
override def loadTable(ident: Identifier): Table = {
try {
- V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
+ val table = catalog.getTableMetadata(ident.asTableIdentifier)
+ if (table.provider.isDefined) {
+ DataSourceV2Utils.getTableProvider(table.provider.get, conf) match {
+ case Some(provider) =>
+ // Get the table properties during creation and append the path
option
+ // to the properties.
+ val dsOptions = getDataSourceOptions(table.properties,
table.storage)
+ // If the source accepts external table metadata, we can pass the
schema and
+ // partitioning information stored in Hive to `getTable` to avoid
expensive
+ // schema/partitioning inference.
+ if (provider.supportsExternalMetadata()) {
+ provider.getTable(
+ table.schema,
+ getV2Partitioning(table),
+ dsOptions.asCaseSensitiveMap())
+ } else {
+ provider.getTable(
+ provider.inferSchema(dsOptions),
+ provider.inferPartitioning(dsOptions),
+ dsOptions.asCaseSensitiveMap())
+ }
+ case _ =>
+ V1Table(table)
+ }
+ } else {
+ V1Table(table)
+ }
} catch {
case _: NoSuchDatabaseException =>
throw QueryCompilationErrors.noSuchTableError(ident)
@@ -96,6 +132,16 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(nameParts))
}
+ private def getV2Partitioning(table: CatalogTable): Array[Transform] = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+ val v2Partitioning = table.partitionColumnNames.asTransforms
+ val v2Bucketing = table.bucketSpec.map(
+ spec => Array(spec.asTransform)).getOrElse(Array.empty)
+ val v2Clustering = table.clusterBySpec.map(
+ spec => Array(spec.asTransform)).getOrElse(Array.empty)
+ v2Partitioning ++ v2Bucketing ++ v2Clustering
+ }
+
override def invalidateTable(ident: Identifier): Unit = {
catalog.refreshTable(ident.asTableIdentifier)
}
@@ -114,14 +160,12 @@ class V2SessionCatalog(catalog: SessionCatalog)
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
- import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
- val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
- partitions.toImmutableArraySeq.convertTransforms
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER,
conf.defaultDataSourceName)
- val tableProperties = properties.asScala
+ val tableProperties = properties.asScala.toMap
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
- val storage =
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
- .copy(locationUri = location.map(CatalogUtils.stringToURI))
+ val storage =
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties))
+ .copy(locationUri = location.map(CatalogUtils.stringToURI))
val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
val tableType = if (isExternal || location.isDefined) {
CatalogTableType.EXTERNAL
@@ -129,17 +173,55 @@ class V2SessionCatalog(catalog: SessionCatalog)
CatalogTableType.MANAGED
}
+ val (newSchema, newPartitions) =
DataSourceV2Utils.getTableProvider(provider, conf) match {
+ // If the provider does not support external metadata, users should not
be allowed to
+ // specify custom schema when creating the data source table, since the
schema will not
+ // be used when loading the table.
+ case Some(p) if !p.supportsExternalMetadata() =>
+ if (schema.nonEmpty) {
+ throw new SparkUnsupportedOperationException(
+ errorClass =
"CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+ messageParameters = Map("tableName" -> ident.fullyQuoted,
"provider" -> provider))
+ }
+ // V2CreateTablePlan does not allow non-empty partitions when schema
is empty. This
+ // is checked in `PreProcessTableCreation` rule.
+ assert(partitions.isEmpty,
+ s"Partitions should be empty when the schema is empty:
${partitions.mkString(", ")}")
+ (schema, partitions)
+
+ case Some(tableProvider) =>
+ assert(tableProvider.supportsExternalMetadata())
+ lazy val dsOptions = getDataSourceOptions(tableProperties, storage)
+ if (schema.isEmpty) {
+ assert(partitions.isEmpty,
+ s"Partitions should be empty when the schema is empty:
${partitions.mkString(", ")}")
+ // Infer the schema and partitions and store them in the catalog.
+ (tableProvider.inferSchema(dsOptions),
tableProvider.inferPartitioning(dsOptions))
+ } else if (partitions.isEmpty) {
+ (schema, tableProvider.inferPartitioning(dsOptions))
+ } else {
+ (schema, partitions)
+ }
+
+ case _ =>
+ // The provider is not a V2 provider so we return the schema and
partitions as is.
+ (schema, partitions)
+ }
+
+ val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
+ newPartitions.toImmutableArraySeq.convertTransforms
+
val tableDesc = CatalogTable(
identifier = ident.asTableIdentifier,
tableType = tableType,
storage = storage,
- schema = schema,
+ schema = newSchema,
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
- properties = tableProperties.toMap ++
+ properties = tableProperties ++
maybeClusterBySpec.map(
- clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec,
conf.resolver)),
+ clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec,
conf.resolver)),
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
diff --git
a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java
b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java
index e7d728fc5a08..91b6621e7767 100644
---
a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java
+++
b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSchemaRequiredDataSource.java
@@ -44,7 +44,9 @@ public class JavaSchemaRequiredDataSource implements
TableProvider {
@Override
public InputPartition[] planInputPartitions() {
- return new InputPartition[0];
+ InputPartition[] partitions = new InputPartition[1];
+ partitions[0] = new JavaRangeInputPartition(0, 2);
+ return partitions;
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index b92b512aa1d3..2b93e8bd3200 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -45,10 +45,8 @@ import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE,
PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.SimpleScanSource
import org.apache.spark.sql.types.{LongType, StringType, StructType}
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
abstract class DataSourceV2SQLSuite
@@ -1120,7 +1118,7 @@ class DataSourceV2SQLSuiteV1Filter
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
- "tableName" -> "`default`.`tbl`",
+ "tableName" -> "`spark_catalog`.`default`.`tbl`",
"tableColumns" -> "`id`, `data`",
"dataColumns" -> "`col1`"
)
@@ -1155,7 +1153,7 @@ class DataSourceV2SQLSuiteV1Filter
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
- "tableName" -> "`default`.`tbl`",
+ "tableName" -> "`spark_catalog`.`default`.`tbl`",
"tableColumns" -> "`id`, `data`",
"dataColumns" -> "`col1`"
)
@@ -1191,7 +1189,7 @@ class DataSourceV2SQLSuiteV1Filter
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
- "tableName" -> "`default`.`tbl`",
+ "tableName" -> "`spark_catalog`.`default`.`tbl`",
"tableColumns" -> "`id`, `data`, `data2`",
"dataColumns" -> "`col1`, `col2`"
)
@@ -3390,13 +3388,6 @@ class DataSourceV2SQLSuiteV2Filter extends
DataSourceV2SQLSuite {
override protected val catalogAndNamespace = "testv2filter.ns1.ns2."
}
-/** Used as a V2 DataSource for V2SessionCatalog DDL */
-class FakeV2Provider extends SimpleTableProvider {
- override def getTable(options: CaseInsensitiveStringMap): Table = {
- throw new UnsupportedOperationException("Unnecessary for DDL tests")
- }
-}
-
class ReserveSchemaNullabilityCatalog extends InMemoryCatalog {
override def useNullableQuerySchema(): Boolean = false
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index e61f8cb0bf06..f2e518e8acc4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -22,6 +22,7 @@ import java.util.OptionalLong
import test.org.apache.spark.sql.connector._
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{PartitionInternalRow,
SupportsRead, Table, TableCapability, TableProvider}
@@ -231,11 +232,11 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
val e =
intercept[IllegalArgumentException](spark.read.format(cls.getName).load())
assert(e.getMessage.contains("requires a user-supplied schema"))
- val schema = new StructType().add("i", "int").add("s", "string")
+ val schema = new StructType().add("i", "int").add("j", "int")
val df = spark.read.format(cls.getName).schema(schema).load()
assert(df.schema == schema)
- assert(df.collect().isEmpty)
+ checkAnswer(df, Seq(Row(0, 0), Row(1, -1)))
}
}
}
@@ -633,6 +634,95 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
}
}
}
+
+ test("SPARK-46043: create table in SQL using a DSv2 source") {
+ Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach
{ cls =>
+ withClue(cls.getName) {
+ // Create a table with empty schema.
+ withTable("test") {
+ sql(s"CREATE TABLE test USING ${cls.getName}")
+ checkAnswer(
+ sql(s"SELECT * FROM test WHERE i < 3"),
+ Seq(Row(0, 0), Row(1, -1), Row(2, -2)))
+ }
+ // Create a table with non-empty schema is not allowed.
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ sql(s"CREATE TABLE test(a INT, b INT) USING ${cls.getName}")
+ },
+ errorClass =
"CANNOT_CREATE_DATA_SOURCE_TABLE.EXTERNAL_METADATA_UNSUPPORTED",
+ parameters = Map("tableName" -> "`default`.`test`", "provider" ->
cls.getName)
+ )
+ }
+ }
+ }
+
+ test("SPARK-46043: create table in SQL with schema required data source") {
+ val cls = classOf[SchemaRequiredDataSource]
+ val e = intercept[IllegalArgumentException] {
+ sql(s"CREATE TABLE test USING ${cls.getName}")
+ }
+ assert(e.getMessage.contains("requires a user-supplied schema"))
+ withTable("test") {
+ sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName}")
+ checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1)))
+ }
+ withTable("test") {
+ sql(s"CREATE TABLE test(i INT) USING ${cls.getName}")
+ checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0), Row(1)))
+ }
+ withTable("test") {
+ // Test the behavior when there is a mismatch between the schema defined
in the
+ // CREATE TABLE command and the actual schema produced by the data
source. The
+ // resulting behavior is not guaranteed and may vary based on the data
source's
+ // implementation.
+ sql(s"CREATE TABLE test(i INT, j INT, k INT) USING ${cls.getName}")
+ val e = intercept[Exception] {
+ sql("SELECT * FROM test").collect()
+ }
+ assert(e.getMessage.contains(
+ "java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for
length 2"))
+ }
+ }
+
+ test("SPARK-46043: create table in SQL with partitioning required data
source") {
+ val cls = classOf[PartitionsRequiredDataSource]
+ val e = intercept[IllegalArgumentException](
+ sql(s"CREATE TABLE test(a INT) USING ${cls.getName}"))
+ assert(e.getMessage.contains("user-supplied partitioning"))
+ withTable("test") {
+ sql(s"CREATE TABLE test(i INT, j INT) USING ${cls.getName} PARTITIONED
BY (i)")
+ checkAnswer(sql(s"SELECT * FROM test"), Seq(Row(0, 0), Row(1, -1)))
+ }
+ }
+
+ test("SPARK-46043: create table in SQL with path option") {
+ val cls = classOf[SupportsExternalMetadataDataSource]
+ withTempDir { dir =>
+ val path = s"${dir.getCanonicalPath}/test"
+ Seq((0, 1), (1, 2)).toDF("x", "y").write.format("csv").save(path)
+ withTable("test") {
+ sql(
+ s"""
+ |CREATE TABLE test USING ${cls.getName}
+ |OPTIONS (PATH '$path')
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2)))
+ sql(
+ s"""
+ |CREATE OR REPLACE TABLE test USING ${cls.getName}
+ |OPTIONS (PATH '${dir.getCanonicalPath}/non-existing')
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Nil)
+ sql(
+ s"""
+ |CREATE OR REPLACE TABLE test USING ${cls.getName}
+ |LOCATION '$path'
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(0, 1), Row(1, 2)))
+ }
+ }
+ }
}
@@ -910,7 +1000,9 @@ class AdvancedReaderFactory(requiredSchema: StructType)
extends PartitionReaderF
class SchemaRequiredDataSource extends TableProvider {
class MyScanBuilder(schema: StructType) extends SimpleScanBuilder {
- override def planInputPartitions(): Array[InputPartition] = Array.empty
+ override def planInputPartitions(): Array[InputPartition] = {
+ Array(RangeInputPartition(0, 2))
+ }
override def readSchema(): StructType = schema
}
@@ -936,6 +1028,12 @@ class SchemaRequiredDataSource extends TableProvider {
}
}
+class PartitionsRequiredDataSource extends SchemaRequiredDataSource {
+ override def inferPartitioning(options: CaseInsensitiveStringMap):
Array[Transform] = {
+ throw new IllegalArgumentException("requires user-supplied partitioning")
+ }
+}
+
class ColumnarDataSourceV2 extends TestingV2Source {
class MyScanBuilder extends SimpleScanBuilder {
@@ -1127,6 +1225,10 @@ class SimpleWriteOnlyDataSource extends
SimpleWritableDataSource {
}
}
+class SupportsExternalMetadataDataSource extends SimpleWritableDataSource {
+ override def supportsExternalMetadata(): Boolean = true
+}
+
class SupportsExternalMetadataWritableDataSource extends
SimpleWritableDataSource {
override def supportsExternalMetadata(): Boolean = true
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
new file mode 100644
index 000000000000..174700e8d24f
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import java.util
+
+import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** Used as a V2 DataSource for V2SessionCatalog DDL */
+class FakeV2Provider extends TableProvider {
+
+ // Supports external metadata so that users can specify schema and partitions
+ // when creating the table.
+ override def supportsExternalMetadata(): Boolean = true
+
+ class MyScanBuilder extends SimpleScanBuilder {
+ override def planInputPartitions(): Array[InputPartition] = {
+ Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
+ }
+ }
+
+ override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+ FakeV2Provider.schema
+ }
+
+ def getTable(options: CaseInsensitiveStringMap): Table = {
+ new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
ScanBuilder = {
+ new MyScanBuilder()
+ }
+ }
+ }
+
+ override def getTable(
+ schema: StructType,
+ partitioning: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ getTable(new CaseInsensitiveStringMap(properties))
+ }
+}
+
+object FakeV2Provider {
+ val schema: StructType = new StructType().add("i", "int").add("j", "int")
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index 63bb0148972c..fa30969d65c5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -126,7 +126,11 @@ abstract class InsertIntoTests(
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data",
"missing"))
- val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1")
else toSQLId(t1)
+ val tableName = if (catalogAndNamespace.isEmpty) {
+ toSQLId(s"spark_catalog.default.$t1")
+ } else {
+ toSQLId(t1)
+ }
checkError(
exception = intercept[AnalysisException] {
doInsert(t1, df)
@@ -145,7 +149,11 @@ abstract class InsertIntoTests(
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit")
verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
- val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1")
else toSQLId(t1)
+ val tableName = if (catalogAndNamespace.isEmpty) {
+ toSQLId(s"spark_catalog.default.$t1")
+ } else {
+ toSQLId(t1)
+ }
checkError(
exception = intercept[AnalysisException] {
doInsert(t1, df)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index 1396ef82925a..719b006c1460 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -78,6 +78,7 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table]
extends Delegating
schema: StructType,
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
+ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY
val propsWithLocation = if (properties.containsKey(key)) {
// Always set a location so that CREATE EXTERNAL TABLE won't fail with
LOCATION not specified.
@@ -92,8 +93,8 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table]
extends Delegating
} else {
properties
}
- val created = super.createTable(ident, schema, partitions,
propsWithLocation)
- val t = newTable(created.name(), schema, partitions, propsWithLocation)
+ super.createTable(ident, schema, partitions, propsWithLocation)
+ val t = newTable(ident.quoted, schema, partitions, propsWithLocation)
addTable(ident, t)
t
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
index ec62739b9cf2..181dc0ea2074 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
@@ -36,7 +36,6 @@ import
org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoI
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf.{OPTIMIZER_MAX_ITERATIONS,
V2_SESSION_CATALOG_IMPLEMENTATION}
-import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
@@ -250,7 +249,7 @@ private object InMemoryV1Provider {
}
class InMemoryV1Provider
- extends SimpleTableProvider
+ extends FakeV2Provider
with DataSourceRegister
with CreatableRelationProvider {
override def getTable(options: CaseInsensitiveStringMap): Table = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index e39cc91d5f04..69b3285fc7f1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1275,7 +1275,7 @@ class PlanResolutionSuite extends AnalysisTest {
},
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
parameters = Map(
- "tableName" -> "`tab2`",
+ "tableName" -> "`testcat`.`tab2`",
"tableColumns" -> "`i`, `x`",
"dataColumns" -> "`col1`")
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]