This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7ab7509fa124 [SPARK-46849][SQL] Run optimizer on CREATE TABLE column
defaults
7ab7509fa124 is described below
commit 7ab7509fa12418ff5f93782670b7e939c055703a
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Fri Jan 26 12:28:10 2024 -0800
[SPARK-46849][SQL] Run optimizer on CREATE TABLE column defaults
### What changes were proposed in this pull request?
This PR updates Catalyst to run the optimizer over `CREATE TABLE` column
default expressions.
### Why are the changes needed?
This helps speed up future commands that assign default values within the
table.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The functionality is covered by existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44876 from dtenedor/analyze-column-defaults.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/parser/AstBuilder.scala | 19 +++++++++++++++-
.../sql/catalyst/plans/logical/v2Commands.scala | 18 ++++++++++++++-
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 15 +++++++++++++
.../sql/connector/catalog/CatalogV2Util.scala | 26 ++++++++++++++++++----
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 2 +-
.../catalyst/analysis/ResolveSessionCatalog.scala | 2 +-
.../datasources/v2/DataSourceV2Strategy.scala | 16 +++++++------
7 files changed, 83 insertions(+), 15 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 54c4343e7ff9..d147d22e4b13 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import java.util.concurrent.TimeUnit
+import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, Set}
import scala.jdk.CollectionConverters._
import scala.util.{Left, Right}
@@ -3997,6 +3998,22 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
serdeInfo, external)
+ // Parse column defaults from the table into separate expressions in the
CREATE TABLE operator.
+ val specifiedDefaults: mutable.Map[Int, Expression] = mutable.Map.empty
+ Option(ctx.createOrReplaceTableColTypeList()).foreach {
+ _.createOrReplaceTableColType().asScala.zipWithIndex.foreach { case
(typeContext, index) =>
+ typeContext.colDefinitionOption().asScala.foreach { option =>
+ Option(option.defaultExpression()).foreach { defaultExprContext =>
+ specifiedDefaults.update(index,
expression(defaultExprContext.expression()))
+ }
+ }
+ }
+ }
+ val defaultValueExpressions: Seq[Option[Expression]] =
+ (0 until columns.size).map { index: Int =>
+ specifiedDefaults.get(index)
+ }
+
Option(ctx.query).map(plan) match {
case Some(_) if columns.nonEmpty =>
operationNotAllowed(
@@ -4018,7 +4035,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
// with data type.
val schema = StructType(columns ++ partCols)
CreateTable(withIdentClause(identifierContext,
UnresolvedIdentifier(_)),
- schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
+ schema, partitioning, tableSpec, ignoreIfExists = ifNotExists,
defaultValueExpressions)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index b17926818900..30be30cb2e04 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -456,13 +456,16 @@ trait V2CreateTablePlan extends LogicalPlan {
/**
* Create a new table with a v2 catalog.
+ * The [[defaultValueExpressions]] hold optional default value expressions to
use when creating the
+ * table, mapping 1:1 with the fields in [[tableSchema]].
*/
case class CreateTable(
name: LogicalPlan,
tableSchema: StructType,
partitioning: Seq[Transform],
tableSpec: TableSpecBase,
- ignoreIfExists: Boolean)
+ ignoreIfExists: Boolean,
+ defaultValueExpressions: Seq[Option[Expression]])
extends UnaryCommand with V2CreateTablePlan {
override def child: LogicalPlan = name
@@ -475,6 +478,19 @@ case class CreateTable(
}
}
+/** This is a helper to build [[CreateTable]] instances with no column default
value expressions. */
+object CreateTable {
+ def apply(
+ name: LogicalPlan,
+ tableSchema: StructType,
+ partitioning: Seq[Transform],
+ tableSpec: TableSpecBase,
+ ignoreIfExists: Boolean): CreateTable = {
+ val defaultValueExpressions = Seq.fill(tableSchema.fields.length)(None)
+ CreateTable(name, tableSchema, partitioning, tableSpec, ignoreIfExists,
defaultValueExpressions)
+ }
+}
+
/**
* Create a new table from a select query with a v2 catalog.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index bf64399c5659..4a04f10cfbc4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -295,6 +295,20 @@ object ResolveDefaultColumns extends QueryErrorsBase with
ResolveDefaultColumnsU
case Project(Seq(a: Alias), OneRowRelation()) => a.child
}.get
// Perform implicit coercion from the provided expression type to the
required column type.
+ coerceDefaultValue(analyzed, dataType, statementType, colName, defaultSQL)
+ }
+
+ /**
+ * Returns the result of type coercion from [[analyzed]] to [[dataType]], or
throws an error if
+ * the expression is not coercible.
+ */
+ def coerceDefaultValue(
+ analyzed: Expression,
+ dataType: DataType,
+ statementType: String,
+ colName: String,
+ defaultSQL: String): Expression = {
+ // Perform implicit coercion from the provided expression type to the
required column type.
if (dataType == analyzed.dataType) {
analyzed
} else if (Cast.canUpCast(analyzed.dataType, dataType)) {
@@ -327,6 +341,7 @@ object ResolveDefaultColumns extends QueryErrorsBase with
ResolveDefaultColumnsU
}
}
}
+
/**
* Normalizes a schema field name suitable for use in looking up into maps
keyed by schema field
* names.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 06887b0b9503..9eb95cd9c4ca 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion,
NamedRelation, NoSuchDatabaseException, NoSuchFunctionException,
NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.GeneratedColumn
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
@@ -481,10 +481,23 @@ private[sql] object CatalogV2Util {
* createTable and related APIs.
*/
def structTypeToV2Columns(schema: StructType): Array[Column] = {
- schema.fields.map(structFieldToV2Column)
+ schema.fields.map(structFieldToV2Column(_, useDefault = None,
statementType = ""))
}
- private def structFieldToV2Column(f: StructField): Column = {
+ /** Same as above, but using the column defaults provided instead of
extracting from metadata. */
+ def structTypeToV2ColumnsWithDefaults(
+ schema: StructType,
+ defaults: Seq[Option[Expression]],
+ statementType: String): Array[Column] = {
+ // Extend 'defaults' to be the same length as 'schema.fields' by filling
in None.
+ val defaultsWithNones = defaults ++ Seq.fill(schema.fields.length -
defaults.length)(None)
+ schema.fields.zip(defaultsWithNones).map { case (field, default) =>
+ structFieldToV2Column(field, default, statementType)
+ }
+ }
+
+ private def structFieldToV2Column(
+ f: StructField, useDefault: Option[Expression], statementType: String):
Column = {
def metadataAsJson(metadata: Metadata): String = {
if (metadata == Metadata.empty) {
null
@@ -513,7 +526,12 @@ private[sql] object CatalogV2Util {
}
if (isDefaultColumn) {
- val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+ val e: Expression = useDefault.map { analyzed =>
+ coerceDefaultValue(
+ analyzed, f.dataType, statementType, f.name,
f.getCurrentDefaultValue().get)
+ }.getOrElse {
+ analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+ }
assert(e.resolved && e.foldable,
"The existence default value must be a simple SQL string that is
resolved and foldable, " +
"but got: " + f.getExistenceDefaultValue().get)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 03b31f1f03e4..c8b99aed5cfa 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2701,7 +2701,7 @@ class DDLParserSuite extends AnalysisTest {
val createTableResult =
CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String],
Some("parquet"),
- OptionList(Seq.empty), None, None, None, false), false)
+ OptionList(Seq.empty), None, None, None, false), false, Seq(None,
Some(Literal("abc"))))
// Parse the CREATE TABLE statement twice, swapping the order of the NOT
NULL and DEFAULT
// options, to make sure that the parser accepts any ordering of these
options.
comparePlans(parsePlan(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index c00a43318035..94c4cb4f98d3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -160,7 +160,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
// For CREATE TABLE [AS SELECT], we should use the v1 command if the
catalog is resolved to the
// session catalog and the table provider is not v2.
- case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec:
TableSpec, _) =>
+ case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec:
TableSpec, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider, tableSpec.options, c.tableSpec.location,
c.tableSpec.serde,
ctas = false)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 3cf311017e5e..53ab7e41b51e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -33,7 +33,7 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn,
ResolveDefaultColumns, V2ExpressionBuilder}
import org.apache.spark.sql.connector.catalog.{Identifier,
StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces,
SupportsPartitionManagement, SupportsWrite, Table, TableCapability,
TableCatalog, TruncatableTable}
-import
org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
+import
org.apache.spark.sql.connector.catalog.CatalogV2Util.{structTypeToV2Columns,
structTypeToV2ColumnsWithDefaults}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference,
LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not =>
V2Not, Or => V2Or, Predicate}
@@ -174,16 +174,18 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query),
customMetrics) :: Nil
case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
- tableSpec: TableSpec, ifNotExists) =>
+ tableSpec: TableSpec, ifNotExists, defaultValueExpressions) =>
ResolveDefaultColumns.validateCatalogForDefaultValue(schema,
catalog.asTableCatalog, ident)
+ val statementType = "CREATE TABLE"
val newSchema: StructType =
- ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- schema, "CREATE TABLE")
+
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(schema,
statementType)
GeneratedColumn.validateGeneratedColumns(
- newSchema, catalog.asTableCatalog, ident, "CREATE TABLE")
+ newSchema, catalog.asTableCatalog, ident, statementType)
- CreateTableExec(catalog.asTableCatalog, ident,
structTypeToV2Columns(newSchema),
- partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
+ val columns =
+ structTypeToV2ColumnsWithDefaults(newSchema, defaultValueExpressions,
statementType)
+ CreateTableExec(catalog.asTableCatalog, ident, columns, partitioning,
+ qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query,
tableSpec: TableSpec,
options, ifNotExists, true) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]