This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cc1459  [SPARK-36850][SQL][FOLLOWUP] Simplify exception code and fix 
wrong condition for CTAS and RTAS
1cc1459 is described below

commit 1cc14598c7487887016793c0723624ceb4f13a8c
Author: Huaxin Gao <[email protected]>
AuthorDate: Mon Dec 13 12:34:27 2021 +0800

    [SPARK-36850][SQL][FOLLOWUP] Simplify exception code and fix wrong 
condition for CTAS and RTAS
    
    ### What changes were proposed in this pull request?
    fixed a few problems:
    1. addressed this 
[comment](https://github.com/apache/spark/pull/34060#discussion_r765992537)
    2. combined several `xxxOnlySupportedWithV2TableError`
    3. in CTAS and RTAS, the `if isSessionCatalog(catalog)` should not be on 
the pattern, it should be `if (isSessionCatalog(catalog) && 
!isV2Provider(provider))`. Otherwise, `c.partitioning ++ 
c.tableSpec.bucketSpec.map(_.asTransform)` is not done for non SessionCatalog 
case.
    I tried this  `c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform)` 
inside `AstBuilder` but it failed [here]( 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala#L850)
 so I kept this in `ResolveSessionCatalog`
    
    ### Why are the changes needed?
    code cleaning up and bug fixing
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests
    
    Closes #34857 from huaxingao/followup.
    
    Lead-authored-by: Huaxin Gao <[email protected]>
    Co-authored-by: Huaxin Gao <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/errors/QueryCompilationErrors.scala  | 28 +--------
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 73 ++++++++++++----------
 .../spark/sql/streaming/DataStreamWriter.scala     |  4 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 38 +++++++++++
 4 files changed, 82 insertions(+), 61 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 920a748..4843051 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -479,48 +479,24 @@ object QueryCompilationErrors {
     new AnalysisException("ADD COLUMN with v1 tables cannot specify NOT NULL.")
   }
 
-  def replaceColumnsOnlySupportedWithV2TableError(): Throwable = {
-    new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
-  }
-
-  def alterQualifiedColumnOnlySupportedWithV2TableError(): Throwable = {
-    new AnalysisException("ALTER COLUMN with qualified column is only 
supported with v2 tables.")
+  def operationOnlySupportedWithV2TableError(operation: String): Throwable = {
+    new AnalysisException(s"$operation is only supported with v2 tables.")
   }
 
   def alterColumnWithV1TableCannotSpecifyNotNullError(): Throwable = {
     new AnalysisException("ALTER COLUMN with v1 tables cannot specify NOT 
NULL.")
   }
 
-  def alterOnlySupportedWithV2TableError(): Throwable = {
-    new AnalysisException("ALTER COLUMN ... FIRST | ALTER is only supported 
with v2 tables.")
-  }
-
   def alterColumnCannotFindColumnInV1TableError(colName: String, v1Table: 
V1Table): Throwable = {
     new AnalysisException(
       s"ALTER COLUMN cannot find column $colName in v1 table. " +
         s"Available: ${v1Table.schema.fieldNames.mkString(", ")}")
   }
 
-  def renameColumnOnlySupportedWithV2TableError(): Throwable = {
-    new AnalysisException("RENAME COLUMN is only supported with v2 tables.")
-  }
-
-  def dropColumnOnlySupportedWithV2TableError(): Throwable = {
-    new AnalysisException("DROP COLUMN is only supported with v2 tables.")
-  }
-
   def invalidDatabaseNameError(quoted: String): Throwable = {
     new AnalysisException(s"The database name is not valid: $quoted")
   }
 
-  def replaceTableOnlySupportedWithV2TableError(): Throwable = {
-    new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
-  }
-
-  def replaceTableAsSelectOnlySupportedWithV2TableError(): Throwable = {
-    new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 
tables.")
-  }
-
   def cannotDropViewWithDropTableError(): Throwable = {
     new AnalysisException("Cannot drop a view with DROP TABLE. Please use DROP 
VIEW instead")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index d5f99bf..15798e0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -56,17 +56,19 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
       AlterTableAddColumnsCommand(ident.asTableIdentifier, 
cols.map(convertToStructField))
 
     case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
-      throw QueryCompilationErrors.replaceColumnsOnlySupportedWithV2TableError
+      throw 
QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE COLUMNS")
 
     case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, 
_, _) =>
       if (a.column.name.length > 1) {
-        throw 
QueryCompilationErrors.alterQualifiedColumnOnlySupportedWithV2TableError
+        throw QueryCompilationErrors
+          .operationOnlySupportedWithV2TableError("ALTER COLUMN with qualified 
column")
       }
       if (a.nullable.isDefined) {
         throw 
QueryCompilationErrors.alterColumnWithV1TableCannotSpecifyNotNullError
       }
       if (a.position.isDefined) {
-        throw QueryCompilationErrors.alterOnlySupportedWithV2TableError
+        throw QueryCompilationErrors
+          .operationOnlySupportedWithV2TableError("ALTER COLUMN ... FIRST | 
ALTER")
       }
       val builder = new MetadataBuilder
       // Add comment to metadata
@@ -88,10 +90,10 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
       AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, 
newColumn)
 
     case RenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
-      throw QueryCompilationErrors.renameColumnOnlySupportedWithV2TableError
+      throw 
QueryCompilationErrors.operationOnlySupportedWithV2TableError("RENAME COLUMN")
 
     case DropColumns(ResolvedV1TableIdentifier(_), _) =>
-      throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
+      throw 
QueryCompilationErrors.operationOnlySupportedWithV2TableError("DROP COLUMN")
 
     case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
       AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = 
false)
@@ -145,35 +147,24 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
     // session catalog and the table provider is not v2.
     case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
-        c.tableSpec.provider,
-        c.tableSpec.options,
-        c.tableSpec.location,
-        c.tableSpec.serde,
+        c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, 
c.tableSpec.serde,
         ctas = false)
       if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
-        val tableDesc = buildCatalogTable(name.asTableIdentifier, 
c.tableSchema,
-          c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, 
provider,
-          c.tableSpec.location, c.tableSpec.comment, storageFormat,
-          c.tableSpec.external)
-        val mode = if (c.ignoreIfExists) SaveMode.Ignore else 
SaveMode.ErrorIfExists
-        CreateTableV1(tableDesc, mode, None)
+        constructV1TableCmd(None, c.tableSpec, name, c.tableSchema, 
c.partitioning,
+          c.ignoreIfExists, storageFormat, provider)
       } else {
         val newTableSpec = c.tableSpec.copy(bucketSpec = None)
         c.copy(partitioning = c.partitioning ++ 
c.tableSpec.bucketSpec.map(_.asTransform),
           tableSpec = newTableSpec)
       }
 
-    case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, 
_, _)
-      if isSessionCatalog(catalog) =>
+    case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, 
_, _) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
         c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, 
c.tableSpec.serde,
         ctas = true)
-      if (!isV2Provider(provider)) {
-        val tableDesc = buildCatalogTable(name.asTableIdentifier, new 
StructType,
-          c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, 
provider,
-          c.tableSpec.location, c.tableSpec.comment, storageFormat, 
c.tableSpec.external)
-        val mode = if (c.ignoreIfExists) SaveMode.Ignore else 
SaveMode.ErrorIfExists
-        CreateTableV1(tableDesc, mode, Some(c.query))
+      if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
+        constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType, 
c.partitioning,
+          c.ignoreIfExists, storageFormat, provider)
       } else {
         val newTableSpec = c.tableSpec.copy(bucketSpec = None)
         c.copy(partitioning = c.partitioning ++ 
c.tableSpec.bucketSpec.map(_.asTransform),
@@ -189,21 +180,21 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
     // For REPLACE TABLE [AS SELECT], we should fail if the catalog is 
resolved to the
     // session catalog and the table provider is not v2.
     case c @ ReplaceTable(
-      ResolvedDBObjectName(catalog, name), _, _, _, _) =>
+      ResolvedDBObjectName(catalog, _), _, _, _, _) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
-        throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError
+        throw 
QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE TABLE")
       } else {
         val newTableSpec = c.tableSpec.copy(bucketSpec = None)
         c.copy(partitioning = c.partitioning ++ 
c.tableSpec.bucketSpec.map(_.asTransform),
           tableSpec = newTableSpec)
       }
 
-    case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, 
_, _)
-        if isSessionCatalog(catalog) =>
+    case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, 
_, _) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
-      if (!isV2Provider(provider)) {
-        throw 
QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError
+      if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
+        throw QueryCompilationErrors
+          .operationOnlySupportedWithV2TableError("REPLACE TABLE AS SELECT")
       } else {
         val newTableSpec = c.tableSpec.copy(bucketSpec = None)
         c.copy(partitioning = c.partitioning ++ 
c.tableSpec.bucketSpec.map(_.asTransform),
@@ -294,7 +285,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
         ident.asTableIdentifier,
         Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)
 
-    case s @ ShowPartitions(
+    case ShowPartitions(
         ResolvedV1TableOrViewIdentifier(ident),
         pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) =>
       ShowPartitionsCommand(
@@ -302,7 +293,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
         output,
         pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))
 
-    case s @ ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
+    case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
       val v1TableName = ident.asTableIdentifier
       val resolver = conf.resolver
       val db = ns match {
@@ -385,10 +376,10 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
       }
 
     // If target is view, force use v1 command
-    case s @ ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, 
output) =>
+    case ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, 
output) =>
       ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
 
-    case s @ ShowTableProperties(ResolvedV1TableIdentifier(ident), 
propertyKey, output)
+    case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, 
output)
         if conf.useV1Command =>
       ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
 
@@ -435,6 +426,22 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
       RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
   }
 
+  private def constructV1TableCmd(
+      query: Option[LogicalPlan],
+      tableSpec: TableSpec,
+      name: Seq[String],
+      tableSchema: StructType,
+      partitioning: Seq[Transform],
+      ignoreIfExists: Boolean,
+      storageFormat: CatalogStorageFormat,
+      provider: String): CreateTableV1 = {
+    val tableDesc = buildCatalogTable(name.asTableIdentifier, tableSchema,
+        partitioning, tableSpec.bucketSpec, tableSpec.properties, provider,
+        tableSpec.location, tableSpec.comment, storageFormat, 
tableSpec.external)
+    val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+    CreateTableV1(tableDesc, mode, query)
+  }
+
   private def getStorageFormatAndProvider(
       provider: Option[String],
       options: Map[String, String],
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 2d3c898..f72d03e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -289,7 +289,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
        * Note, currently the new table creation by this API doesn't fully 
cover the V2 table.
        * TODO (SPARK-33638): Full support of v2 table creation
        */
-      val tableProperties = TableSpec(
+      val tableSpec = TableSpec(
         None,
         Map.empty[String, String],
         Some(source),
@@ -304,7 +304,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
           isNamespace = false),
         df.schema.asNullable,
         partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
-        tableProperties,
+        tableSpec,
         ignoreIfExists = false)
       Dataset.ofRows(df.sparkSession, cmd)
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 4c5a001..0b9914f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -407,6 +407,44 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-36850: CreateTableAsSelect partitions can be specified using " +
+    "PARTITIONED BY and/or CLUSTERED BY") {
+    val identifier = "testcat.table_name"
+    withTable(identifier) {
+      spark.sql(s"CREATE TABLE $identifier USING foo PARTITIONED BY (id) " +
+        s"CLUSTERED BY (data) INTO 4 BUCKETS AS SELECT * FROM source")
+      val describe = spark.sql(s"DESCRIBE $identifier")
+      val part1 = describe
+        .filter("col_name = 'Part 0'")
+        .select("data_type").head.getString(0)
+      assert(part1 === "id")
+      val part2 = describe
+        .filter("col_name = 'Part 1'")
+        .select("data_type").head.getString(0)
+      assert(part2 === "bucket(4, data)")
+    }
+  }
+
+  test("SPARK-36850: ReplaceTableAsSelect partitions can be specified using " +
+    "PARTITIONED BY and/or CLUSTERED BY") {
+    val identifier = "testcat.table_name"
+    withTable(identifier) {
+      spark.sql(s"CREATE TABLE $identifier USING foo " +
+        "AS SELECT id FROM source")
+      spark.sql(s"REPLACE TABLE $identifier USING foo PARTITIONED BY (id) " +
+        s"CLUSTERED BY (data) INTO 4 BUCKETS AS SELECT * FROM source")
+      val describe = spark.sql(s"DESCRIBE $identifier")
+      val part1 = describe
+        .filter("col_name = 'Part 0'")
+        .select("data_type").head.getString(0)
+      assert(part1 === "id")
+      val part2 = describe
+        .filter("col_name = 'Part 1'")
+        .select("data_type").head.getString(0)
+      assert(part2 === "bucket(4, data)")
+    }
+  }
+
   test("SPARK-37545: CreateTableAsSelect should store location as qualified") {
     val basicIdentifier = "testcat.table_name"
     val atomicIdentifier = "testcat_atomic.table_name"

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to