This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ab7509fa124 [SPARK-46849][SQL] Run optimizer on CREATE TABLE column 
defaults
7ab7509fa124 is described below

commit 7ab7509fa12418ff5f93782670b7e939c055703a
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Fri Jan 26 12:28:10 2024 -0800

    [SPARK-46849][SQL] Run optimizer on CREATE TABLE column defaults
    
    ### What changes were proposed in this pull request?
    
    This PR updates Catalyst to run the optimizer over `CREATE TABLE` column 
default expressions.
    
    ### Why are the changes needed?
    
    This helps speed up future commands that assign default values within the 
table.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    The functionality is covered by existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44876 from dtenedor/analyze-column-defaults.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 19 +++++++++++++++-
 .../sql/catalyst/plans/logical/v2Commands.scala    | 18 ++++++++++++++-
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 15 +++++++++++++
 .../sql/connector/catalog/CatalogV2Util.scala      | 26 ++++++++++++++++++----
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  2 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  2 +-
 .../datasources/v2/DataSourceV2Strategy.scala      | 16 +++++++------
 7 files changed, 83 insertions(+), 15 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 54c4343e7ff9..d147d22e4b13 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.parser
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, Set}
 import scala.jdk.CollectionConverters._
 import scala.util.{Left, Right}
@@ -3997,6 +3998,22 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     val tableSpec = UnresolvedTableSpec(properties, provider, options, 
location, comment,
       serdeInfo, external)
 
+    // Parse column defaults from the table into separate expressions in the 
CREATE TABLE operator.
+    val specifiedDefaults: mutable.Map[Int, Expression] = mutable.Map.empty
+    Option(ctx.createOrReplaceTableColTypeList()).foreach {
+      _.createOrReplaceTableColType().asScala.zipWithIndex.foreach { case 
(typeContext, index) =>
+        typeContext.colDefinitionOption().asScala.foreach { option =>
+          Option(option.defaultExpression()).foreach { defaultExprContext =>
+            specifiedDefaults.update(index, 
expression(defaultExprContext.expression()))
+          }
+        }
+      }
+    }
+    val defaultValueExpressions: Seq[Option[Expression]] =
+      (0 until columns.size).map { index: Int =>
+        specifiedDefaults.get(index)
+      }
+
     Option(ctx.query).map(plan) match {
       case Some(_) if columns.nonEmpty =>
         operationNotAllowed(
@@ -4018,7 +4035,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
         // with data type.
         val schema = StructType(columns ++ partCols)
         CreateTable(withIdentClause(identifierContext, 
UnresolvedIdentifier(_)),
-          schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
+          schema, partitioning, tableSpec, ignoreIfExists = ifNotExists, 
defaultValueExpressions)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index b17926818900..30be30cb2e04 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -456,13 +456,16 @@ trait V2CreateTablePlan extends LogicalPlan {
 
 /**
  * Create a new table with a v2 catalog.
+ * The [[defaultValueExpressions]] hold optional default value expressions to 
use when creating the
+ * table, mapping 1:1 with the fields in [[tableSchema]].
  */
 case class CreateTable(
     name: LogicalPlan,
     tableSchema: StructType,
     partitioning: Seq[Transform],
     tableSpec: TableSpecBase,
-    ignoreIfExists: Boolean)
+    ignoreIfExists: Boolean,
+    defaultValueExpressions: Seq[Option[Expression]])
   extends UnaryCommand with V2CreateTablePlan {
 
   override def child: LogicalPlan = name
@@ -475,6 +478,19 @@ case class CreateTable(
   }
 }
 
+/** This is a helper to build [[CreateTable]] instances with no column default 
value expressions. */
+object CreateTable {
+  def apply(
+      name: LogicalPlan,
+      tableSchema: StructType,
+      partitioning: Seq[Transform],
+      tableSpec: TableSpecBase,
+      ignoreIfExists: Boolean): CreateTable = {
+    val defaultValueExpressions = Seq.fill(tableSchema.fields.length)(None)
+    CreateTable(name, tableSchema, partitioning, tableSpec, ignoreIfExists, 
defaultValueExpressions)
+  }
+}
+
 /**
  * Create a new table from a select query with a v2 catalog.
  */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index bf64399c5659..4a04f10cfbc4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -295,6 +295,20 @@ object ResolveDefaultColumns extends QueryErrorsBase with 
ResolveDefaultColumnsU
       case Project(Seq(a: Alias), OneRowRelation()) => a.child
     }.get
     // Perform implicit coercion from the provided expression type to the 
required column type.
+    coerceDefaultValue(analyzed, dataType, statementType, colName, defaultSQL)
+  }
+
+  /**
+   * Returns the result of type coercion from [[analyzed]] to [[dataType]], or 
throws an error if
+   * the expression is not coercible.
+   */
+  def coerceDefaultValue(
+      analyzed: Expression,
+      dataType: DataType,
+      statementType: String,
+      colName: String,
+      defaultSQL: String): Expression = {
+    // Perform implicit coercion from the provided expression type to the 
required column type.
     if (dataType == analyzed.dataType) {
       analyzed
     } else if (Cast.canUpCast(analyzed.dataType, dataType)) {
@@ -327,6 +341,7 @@ object ResolveDefaultColumns extends QueryErrorsBase with 
ResolveDefaultColumnsU
       }
     }
   }
+
   /**
    * Normalizes a schema field name suitable for use in looking up into maps 
keyed by schema field
    * names.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 06887b0b9503..9eb95cd9c4ca 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.CurrentUserContext
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, 
NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, 
NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
 import org.apache.spark.sql.catalyst.util.GeneratedColumn
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
@@ -481,10 +481,23 @@ private[sql] object CatalogV2Util {
    * createTable and related APIs.
    */
   def structTypeToV2Columns(schema: StructType): Array[Column] = {
-    schema.fields.map(structFieldToV2Column)
+    schema.fields.map(structFieldToV2Column(_, useDefault = None, 
statementType = ""))
   }
 
-  private def structFieldToV2Column(f: StructField): Column = {
+  /** Same as above, but using the column defaults provided instead of 
extracting from metadata. */
+  def structTypeToV2ColumnsWithDefaults(
+      schema: StructType,
+      defaults: Seq[Option[Expression]],
+      statementType: String): Array[Column] = {
+    // Extend 'defaults' to be the same length as 'schema.fields' by filling 
in None.
+    val defaultsWithNones = defaults ++ Seq.fill(schema.fields.length - 
defaults.length)(None)
+    schema.fields.zip(defaultsWithNones).map { case (field, default) =>
+      structFieldToV2Column(field, default, statementType)
+    }
+  }
+
+  private def structFieldToV2Column(
+      f: StructField, useDefault: Option[Expression], statementType: String): 
Column = {
     def metadataAsJson(metadata: Metadata): String = {
       if (metadata == Metadata.empty) {
         null
@@ -513,7 +526,12 @@ private[sql] object CatalogV2Util {
     }
 
     if (isDefaultColumn) {
-      val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+      val e: Expression = useDefault.map { analyzed =>
+        coerceDefaultValue(
+          analyzed, f.dataType, statementType, f.name, 
f.getCurrentDefaultValue().get)
+      }.getOrElse {
+        analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+      }
       assert(e.resolved && e.foldable,
         "The existence default value must be a simple SQL string that is 
resolved and foldable, " +
           "but got: " + f.getExistenceDefaultValue().get)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 03b31f1f03e4..c8b99aed5cfa 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2701,7 +2701,7 @@ class DDLParserSuite extends AnalysisTest {
     val createTableResult =
       CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
         Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], 
Some("parquet"),
-         OptionList(Seq.empty), None, None, None, false), false)
+         OptionList(Seq.empty), None, None, None, false), false, Seq(None, 
Some(Literal("abc"))))
     // Parse the CREATE TABLE statement twice, swapping the order of the NOT 
NULL and DEFAULT
     // options, to make sure that the parser accepts any ordering of these 
options.
     comparePlans(parsePlan(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index c00a43318035..94c4cb4f98d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -160,7 +160,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
 
     // For CREATE TABLE [AS SELECT], we should use the v1 command if the 
catalog is resolved to the
     // session catalog and the table provider is not v2.
-    case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: 
TableSpec, _) =>
+    case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: 
TableSpec, _, _) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
         c.tableSpec.provider, tableSpec.options, c.tableSpec.location, 
c.tableSpec.serde,
         ctas = false)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 3cf311017e5e..53ab7e41b51e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -33,7 +33,7 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, 
ResolveDefaultColumns, V2ExpressionBuilder}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, 
SupportsPartitionManagement, SupportsWrite, Table, TableCapability, 
TableCatalog, TruncatableTable}
-import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.{structTypeToV2Columns, 
structTypeToV2ColumnsWithDefaults}
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
LiteralValue}
 import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => 
V2Not, Or => V2Or, Predicate}
@@ -174,16 +174,18 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), 
customMetrics) :: Nil
 
     case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
-        tableSpec: TableSpec, ifNotExists) =>
+        tableSpec: TableSpec, ifNotExists, defaultValueExpressions) =>
       ResolveDefaultColumns.validateCatalogForDefaultValue(schema, 
catalog.asTableCatalog, ident)
+      val statementType = "CREATE TABLE"
       val newSchema: StructType =
-        ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          schema, "CREATE TABLE")
+        
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(schema, 
statementType)
       GeneratedColumn.validateGeneratedColumns(
-        newSchema, catalog.asTableCatalog, ident, "CREATE TABLE")
+        newSchema, catalog.asTableCatalog, ident, statementType)
 
-      CreateTableExec(catalog.asTableCatalog, ident, 
structTypeToV2Columns(newSchema),
-        partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
+      val columns =
+        structTypeToV2ColumnsWithDefaults(newSchema, defaultValueExpressions, 
statementType)
+      CreateTableExec(catalog.asTableCatalog, ident, columns, partitioning,
+        qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
     case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, 
tableSpec: TableSpec,
         options, ifNotExists, true) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to