This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1cc1459 [SPARK-36850][SQL][FOLLOWUP] Simplify exception code and fix
wrong condition for CTAS and RTAS
1cc1459 is described below
commit 1cc14598c7487887016793c0723624ceb4f13a8c
Author: Huaxin Gao <[email protected]>
AuthorDate: Mon Dec 13 12:34:27 2021 +0800
[SPARK-36850][SQL][FOLLOWUP] Simplify exception code and fix wrong
condition for CTAS and RTAS
### What changes were proposed in this pull request?
fixed a few problems:
1. addressed this
[comment](https://github.com/apache/spark/pull/34060#discussion_r765992537)
2. combined several `xxxOnlySupportedWithV2TableError`
3. in CTAS and RTAS, the `if isSessionCatalog(catalog)` should not be on
the pattern, it should be `if (isSessionCatalog(catalog) &&
!isV2Provider(provider))`. Otherwise, `c.partitioning ++
c.tableSpec.bucketSpec.map(_.asTransform)` is not done for non SessionCatalog
case.
I tried this `c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform)`
inside `AstBuilder` but it failed [here](
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala#L850)
so I kept this in `ResolveSessionCatalog`
### Why are the changes needed?
code cleaning up and bug fixing
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests
Closes #34857 from huaxingao/followup.
Lead-authored-by: Huaxin Gao <[email protected]>
Co-authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/errors/QueryCompilationErrors.scala | 28 +--------
.../catalyst/analysis/ResolveSessionCatalog.scala | 73 ++++++++++++----------
.../spark/sql/streaming/DataStreamWriter.scala | 4 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 38 +++++++++++
4 files changed, 82 insertions(+), 61 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 920a748..4843051 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -479,48 +479,24 @@ object QueryCompilationErrors {
new AnalysisException("ADD COLUMN with v1 tables cannot specify NOT NULL.")
}
- def replaceColumnsOnlySupportedWithV2TableError(): Throwable = {
- new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
- }
-
- def alterQualifiedColumnOnlySupportedWithV2TableError(): Throwable = {
- new AnalysisException("ALTER COLUMN with qualified column is only
supported with v2 tables.")
+ def operationOnlySupportedWithV2TableError(operation: String): Throwable = {
+ new AnalysisException(s"$operation is only supported with v2 tables.")
}
def alterColumnWithV1TableCannotSpecifyNotNullError(): Throwable = {
new AnalysisException("ALTER COLUMN with v1 tables cannot specify NOT
NULL.")
}
- def alterOnlySupportedWithV2TableError(): Throwable = {
- new AnalysisException("ALTER COLUMN ... FIRST | ALTER is only supported
with v2 tables.")
- }
-
def alterColumnCannotFindColumnInV1TableError(colName: String, v1Table:
V1Table): Throwable = {
new AnalysisException(
s"ALTER COLUMN cannot find column $colName in v1 table. " +
s"Available: ${v1Table.schema.fieldNames.mkString(", ")}")
}
- def renameColumnOnlySupportedWithV2TableError(): Throwable = {
- new AnalysisException("RENAME COLUMN is only supported with v2 tables.")
- }
-
- def dropColumnOnlySupportedWithV2TableError(): Throwable = {
- new AnalysisException("DROP COLUMN is only supported with v2 tables.")
- }
-
def invalidDatabaseNameError(quoted: String): Throwable = {
new AnalysisException(s"The database name is not valid: $quoted")
}
- def replaceTableOnlySupportedWithV2TableError(): Throwable = {
- new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
- }
-
- def replaceTableAsSelectOnlySupportedWithV2TableError(): Throwable = {
- new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2
tables.")
- }
-
def cannotDropViewWithDropTableError(): Throwable = {
new AnalysisException("Cannot drop a view with DROP TABLE. Please use DROP
VIEW instead")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index d5f99bf..15798e0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -56,17 +56,19 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
AlterTableAddColumnsCommand(ident.asTableIdentifier,
cols.map(convertToStructField))
case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
- throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError
+ throw
QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE COLUMNS")
case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _,
_, _) =>
if (a.column.name.length > 1) {
- throw
QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError
+ throw QueryCompilationErrors
+ .operationOnlySupportedWithV2TableError("ALTER COLUMN with qualified
column")
}
if (a.nullable.isDefined) {
throw
QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError
}
if (a.position.isDefined) {
- throw QueryCompilationErrors.alterOnlySupportedWithV2TableError
+ throw QueryCompilationErrors
+ .operationOnlySupportedWithV2TableError("ALTER COLUMN ... FIRST |
ALTER")
}
val builder = new MetadataBuilder
// Add comment to metadata
@@ -88,10 +90,10 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
AlterTableChangeColumnCommand(ident.asTableIdentifier, colName,
newColumn)
case RenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
- throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError
+ throw
QueryCompilationErrors.operationOnlySupportedWithV2TableError("RENAME COLUMN")
case DropColumns(ResolvedV1TableIdentifier(_), _) =>
- throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
+ throw
QueryCompilationErrors.operationOnlySupportedWithV2TableError("DROP COLUMN")
case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView =
false)
@@ -145,35 +147,24 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
// session catalog and the table provider is not v2.
case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
- c.tableSpec.provider,
- c.tableSpec.options,
- c.tableSpec.location,
- c.tableSpec.serde,
+ c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location,
c.tableSpec.serde,
ctas = false)
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
- val tableDesc = buildCatalogTable(name.asTableIdentifier,
c.tableSchema,
- c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties,
provider,
- c.tableSpec.location, c.tableSpec.comment, storageFormat,
- c.tableSpec.external)
- val mode = if (c.ignoreIfExists) SaveMode.Ignore else
SaveMode.ErrorIfExists
- CreateTableV1(tableDesc, mode, None)
+ constructV1TableCmd(None, c.tableSpec, name, c.tableSchema,
c.partitioning,
+ c.ignoreIfExists, storageFormat, provider)
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++
c.tableSpec.bucketSpec.map(_.asTransform),
tableSpec = newTableSpec)
}
- case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _,
_, _)
- if isSessionCatalog(catalog) =>
+ case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _,
_, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location,
c.tableSpec.serde,
ctas = true)
- if (!isV2Provider(provider)) {
- val tableDesc = buildCatalogTable(name.asTableIdentifier, new
StructType,
- c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties,
provider,
- c.tableSpec.location, c.tableSpec.comment, storageFormat,
c.tableSpec.external)
- val mode = if (c.ignoreIfExists) SaveMode.Ignore else
SaveMode.ErrorIfExists
- CreateTableV1(tableDesc, mode, Some(c.query))
+ if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
+ constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType,
c.partitioning,
+ c.ignoreIfExists, storageFormat, provider)
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++
c.tableSpec.bucketSpec.map(_.asTransform),
@@ -189,21 +180,21 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is
resolved to the
// session catalog and the table provider is not v2.
case c @ ReplaceTable(
- ResolvedDBObjectName(catalog, name), _, _, _, _) =>
+ ResolvedDBObjectName(catalog, _), _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
- throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError
+ throw
QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE TABLE")
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++
c.tableSpec.bucketSpec.map(_.asTransform),
tableSpec = newTableSpec)
}
- case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _,
_, _)
- if isSessionCatalog(catalog) =>
+ case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _,
_, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
- if (!isV2Provider(provider)) {
- throw
QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError
+ if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
+ throw QueryCompilationErrors
+ .operationOnlySupportedWithV2TableError("REPLACE TABLE AS SELECT")
} else {
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++
c.tableSpec.bucketSpec.map(_.asTransform),
@@ -294,7 +285,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
ident.asTableIdentifier,
Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)
- case s @ ShowPartitions(
+ case ShowPartitions(
ResolvedV1TableOrViewIdentifier(ident),
pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) =>
ShowPartitionsCommand(
@@ -302,7 +293,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
output,
pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))
- case s @ ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
+ case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
val v1TableName = ident.asTableIdentifier
val resolver = conf.resolver
val db = ns match {
@@ -385,10 +376,10 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
}
// If target is view, force use v1 command
- case s @ ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey,
output) =>
+ case ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey,
output) =>
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
- case s @ ShowTableProperties(ResolvedV1TableIdentifier(ident),
propertyKey, output)
+ case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey,
output)
if conf.useV1Command =>
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
@@ -435,6 +426,22 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
}
+ private def constructV1TableCmd(
+ query: Option[LogicalPlan],
+ tableSpec: TableSpec,
+ name: Seq[String],
+ tableSchema: StructType,
+ partitioning: Seq[Transform],
+ ignoreIfExists: Boolean,
+ storageFormat: CatalogStorageFormat,
+ provider: String): CreateTableV1 = {
+ val tableDesc = buildCatalogTable(name.asTableIdentifier, tableSchema,
+ partitioning, tableSpec.bucketSpec, tableSpec.properties, provider,
+ tableSpec.location, tableSpec.comment, storageFormat,
tableSpec.external)
+ val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+ CreateTableV1(tableDesc, mode, query)
+ }
+
private def getStorageFormatAndProvider(
provider: Option[String],
options: Map[String, String],
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 2d3c898..f72d03e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -289,7 +289,7 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
* Note, currently the new table creation by this API doesn't fully
cover the V2 table.
* TODO (SPARK-33638): Full support of v2 table creation
*/
- val tableProperties = TableSpec(
+ val tableSpec = TableSpec(
None,
Map.empty[String, String],
Some(source),
@@ -304,7 +304,7 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
isNamespace = false),
df.schema.asNullable,
partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
- tableProperties,
+ tableSpec,
ignoreIfExists = false)
Dataset.ofRows(df.sparkSession, cmd)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 4c5a001..0b9914f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -407,6 +407,44 @@ class DataSourceV2SQLSuite
}
}
+ test("SPARK-36850: CreateTableAsSelect partitions can be specified using " +
+ "PARTITIONED BY and/or CLUSTERED BY") {
+ val identifier = "testcat.table_name"
+ withTable(identifier) {
+ spark.sql(s"CREATE TABLE $identifier USING foo PARTITIONED BY (id) " +
+ s"CLUSTERED BY (data) INTO 4 BUCKETS AS SELECT * FROM source")
+ val describe = spark.sql(s"DESCRIBE $identifier")
+ val part1 = describe
+ .filter("col_name = 'Part 0'")
+ .select("data_type").head.getString(0)
+ assert(part1 === "id")
+ val part2 = describe
+ .filter("col_name = 'Part 1'")
+ .select("data_type").head.getString(0)
+ assert(part2 === "bucket(4, data)")
+ }
+ }
+
+ test("SPARK-36850: ReplaceTableAsSelect partitions can be specified using " +
+ "PARTITIONED BY and/or CLUSTERED BY") {
+ val identifier = "testcat.table_name"
+ withTable(identifier) {
+ spark.sql(s"CREATE TABLE $identifier USING foo " +
+ "AS SELECT id FROM source")
+ spark.sql(s"REPLACE TABLE $identifier USING foo PARTITIONED BY (id) " +
+ s"CLUSTERED BY (data) INTO 4 BUCKETS AS SELECT * FROM source")
+ val describe = spark.sql(s"DESCRIBE $identifier")
+ val part1 = describe
+ .filter("col_name = 'Part 0'")
+ .select("data_type").head.getString(0)
+ assert(part1 === "id")
+ val part2 = describe
+ .filter("col_name = 'Part 1'")
+ .select("data_type").head.getString(0)
+ assert(part2 === "bucket(4, data)")
+ }
+ }
+
test("SPARK-37545: CreateTableAsSelect should store location as qualified") {
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]