This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e95738  [SPARK-38336][SQL] Support DEFAULT column values in 
CREATE/REPLACE TABLE statements
4e95738 is described below

commit 4e95738fdfc334c25f44689ff8c2db5aa7c726f2
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Sat Mar 26 12:09:56 2022 +0800

    [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE 
statements
    
    ### What changes were proposed in this pull request?
    
    Extend CREATE TABLE and REPLACE TABLE statements to support columns with 
DEFAULT values. This information will be stored in the column metadata.
    
    ### Why are the changes needed?
    
    This builds the foundation for future work (not included in this PR) to 
support INSERT INTO statements, which may then omit the default values or refer 
to them explicitly with the DEFAULT keyword, in which case the Spark analyzer 
will automatically insert the appropriate corresponding values in the right 
places.
    
    Example:
    ```
    CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
    INSERT INTO T VALUES (1);
    INSERT INTO T VALUES (1, DEFAULT);
    INSERT INTO T VALUES (DEFAULT, 6);
    SELECT * FROM T;
    (1, 5)
    (1, 5)
    (4, 6)
    ```
    
    ### How was this patch tested?
    
    This change is covered by new and existing unit test coverage as well as 
new INSERT INTO query test cases covering a variety of positive and negative 
scenarios.
    
    Closes #35855 from dtenedor/default-cols-create-table.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  34 ++++-
 .../sql/catalyst/util/ResolveDefaultColumns.scala  | 153 +++++++++++++++++++++
 .../spark/sql/errors/QueryParsingErrors.scala      |   4 +
 .../org/apache/spark/sql/internal/SQLConf.scala    |  13 ++
 .../catalyst/catalog/ExternalCatalogSuite.scala    |  44 ++++--
 .../sql/catalyst/catalog/SessionCatalogSuite.scala |  62 +++++++++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  56 ++++++--
 .../datasources/v2/DataSourceV2Strategy.scala      |  15 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  14 +-
 .../org/apache/spark/sql/hive/InsertSuite.scala    |  22 +--
 10 files changed, 369 insertions(+), 48 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3fcd8d8..01e627f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Set}
 
 import org.antlr.v4.runtime.{ParserRuleContext, Token}
+import org.antlr.v4.runtime.misc.Interval
 import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
 import org.apache.commons.codec.DecoderException
 import org.apache.commons.codec.binary.Hex
@@ -39,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, 
IntervalUtils}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, 
IntervalUtils, ResolveDefaultColumns}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, 
stringToTimestamp, stringToTimestampWithoutTimeZone}
 import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, 
TableCatalog}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -2788,13 +2789,18 @@ class AstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     Option(commentSpec()).map(visitCommentSpec).foreach {
       builder.putString("comment", _)
     }
+    // Add the 'DEFAULT expression' clause in the column definition, if any, 
to the column metadata.
+    Option(ctx.defaultExpression()).map(visitDefaultExpression).foreach { 
field =>
+      if (conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
+        // Add default to metadata
+        
builder.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
field)
+        
builder.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
field)
+      } else {
+        throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
+      }
+    }
 
-    // Process the 'DEFAULT expression' clause in the column definition, if 
any.
     val name: String = colName.getText
-    val defaultExpr = 
Option(ctx.defaultExpression()).map(visitDefaultExpression)
-    if (defaultExpr.isDefined) {
-      throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
-    }
 
     StructField(
       name = name,
@@ -2852,6 +2858,22 @@ class AstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   }
 
   /**
+   * Create a default string.
+   */
+  override def visitDefaultExpression(ctx: DefaultExpressionContext): String = 
withOrigin(ctx) {
+    val exprCtx = ctx.expression()
+    // Make sure it can be converted to Catalyst expressions.
+    expression(exprCtx)
+    // Extract the raw expression text so that we can save the user provided 
text. We don't
+    // use `Expression.sql` to avoid storing incorrect text caused by bugs in 
any expression's
+    // `sql` method. Note: `exprCtx.getText` returns a string without spaces, 
so we need to
+    // get the text from the underlying char stream instead.
+    val start = exprCtx.getStart.getStartIndex
+    val end = exprCtx.getStop.getStopIndex
+    exprCtx.getStart.getInputStream.getText(new Interval(start, end))
+  }
+
+  /**
    * Create an optional comment string.
    */
   protected def visitCommentSpecList(ctx: java.util.List[CommentSpecContext]): 
Option[String] = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
new file mode 100644
index 0000000..1e40756
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains fields to help process DEFAULT columns.
+ */
+object ResolveDefaultColumns {
+  // This column metadata indicates the default value associated with a 
particular table column that
+  // is in effect at any given time. Its value begins at the time of the 
initial CREATE/REPLACE
+  // TABLE statement with DEFAULT column definition(s), if any. It then 
changes whenever an ALTER
+  // TABLE statement SETs the DEFAULT. The intent is for this "current 
default" to be used by
+  // UPDATE, INSERT and MERGE, which evaluate each default expression for each 
row.
+  val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
+  // This column metadata represents the default value for all existing rows 
in a table after a
+  // column has been added. This value is determined at time of CREATE TABLE, 
REPLACE TABLE, or
+  // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for 
this "exist default"
+  // to be used by any scan when the columns in the source row are missing 
data. For example,
+  // consider the following sequence:
+  // CREATE TABLE t (c1 INT)
+  // INSERT INTO t VALUES (42)
+  // ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
+  // SELECT c1, c2 FROM t
+  // In this case, the final query is expected to return 42, 43. The ALTER 
TABLE ADD COLUMNS command
+  // executed after there was already data in the table, so in order to 
enforce this invariant,
+  // we need either (1) an expensive backfill of value 43 at column c2 into 
all previous rows, or
+  // (2) indicate to each data source that selected columns missing data are 
to generate the
+  // corresponding DEFAULT value instead. We choose option (2) for efficiency, 
and represent this
+  // value as the text representation of a folded constant in the 
"EXISTS_DEFAULT" column metadata.
+  val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
+
+  /**
+   * Finds "current default" expressions in CREATE/REPLACE TABLE columns and 
constant-folds them.
+   *
+   * The results are stored in the "exists default" metadata of the same 
columns. For example, in
+   * the event of this statement:
+   *
+   * CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)
+   *
+   * This method constant-folds the "current default" value, stored in the 
CURRENT_DEFAULT metadata
+   * of the "b" column, to "10", storing the result in the "exists default" 
value within the
+   * EXISTS_DEFAULT metadata of that same column. Meanwhile the "current 
default" metadata of this
+   * "b" column retains its original value of "5 + 5".
+   *
+   * The reason for constant-folding the EXISTS_DEFAULT is to make the 
end-user visible behavior the
+   * same, after executing an ALTER TABLE ADD COLUMNS command with DEFAULT 
value, as if the system
+   * had performed an exhaustive backfill of the provided value to all 
previously existing rows in
+   * the table instead. We choose to avoid doing such a backfill because it 
would be a
+   * time-consuming and costly operation. Instead, we elect to store the 
EXISTS_DEFAULT in the
+   * column metadata for future reference when querying data out of the data 
source. In turn, each
+   * data source then takes responsibility to provide the constant-folded 
value in the
+   * EXISTS_DEFAULT metadata for such columns where the value is not present 
in storage.
+   *
+   * @param analyzer used for analyzing the result of parsing the column 
expression stored as text.
+   * @param tableSchema represents the names and types of the columns of the 
statement to process.
+   * @param statementType name of the statement being processed, such as 
INSERT; useful for errors.
+   * @return a copy of `tableSchema` with field metadata updated with the 
constant-folded values.
+   */
+  def constantFoldCurrentDefaultsToExistDefaults(
+      analyzer: Analyzer,
+      tableSchema: StructType,
+      statementType: String): StructType = {
+    if (!SQLConf.get.enableDefaultColumns) {
+      return tableSchema
+    }
+    val newFields: Seq[StructField] = tableSchema.fields.map { field =>
+      if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+        val analyzed: Expression = analyze(analyzer, field, statementType)
+        val newMetadata: Metadata = new 
MetadataBuilder().withMetadata(field.metadata)
+          .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, analyzed.sql).build()
+        field.copy(metadata = newMetadata)
+      } else {
+        field
+      }
+    }
+    StructType(newFields)
+  }
+
+  /**
+   * Parses and analyzes the DEFAULT column text in `field`, returning an 
error upon failure.
+   *
+   * @param field represents the DEFAULT column value whose "default" metadata 
to parse and analyze.
+   * @param statementType which type of statement we are running, such as 
INSERT; useful for errors.
+   * @return Result of the analysis and constant-folding operation.
+   */
+  def analyze(
+      analyzer: Analyzer,
+      field: StructField,
+      statementType: String): Expression = {
+    // Parse the expression.
+    val colText: String = 
field.metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+    lazy val parser = new CatalystSqlParser()
+    val parsed: Expression = try {
+      parser.parseExpression(colText)
+    } catch {
+      case ex: ParseException =>
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the destination 
table column " +
+            s"${field.name} has a DEFAULT value of $colText which fails to 
parse as a valid " +
+            s"expression: ${ex.getMessage}")
+    }
+    // Analyze the parse result.
+    val plan = try {
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, 
field.name)()), OneRowRelation()))
+      analyzer.checkAnalysis(analyzed)
+      ConstantFolding(analyzed)
+    } catch {
+      case ex: AnalysisException =>
+        throw new AnalysisException(
+          s"Failed to execute $statementType command because the destination 
table column " +
+            s"${field.name} has a DEFAULT value of $colText which fails to 
resolve as a valid " +
+            s"expression: ${ex.getMessage}")
+    }
+    val analyzed: Expression = plan.collectFirst {
+      case Project(Seq(a: Alias), OneRowRelation()) => a.child
+    }.get
+    // Perform implicit coercion from the provided expression type to the 
required column type.
+    if (field.dataType == analyzed.dataType) {
+      analyzed
+    } else if (Cast.canUpCast(analyzed.dataType, field.dataType)) {
+      Cast(analyzed, field.dataType)
+    } else {
+      throw new AnalysisException(
+        s"Failed to execute $statementType command because the destination 
table column " +
+          s"${field.name} has a DEFAULT value with type ${field.dataType}, but 
the " +
+          s"statement provided a value of incompatible type 
${analyzed.dataType}")
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index d055299..d40f276 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -444,4 +444,8 @@ object QueryParsingErrors {
   def defaultColumnNotImplementedYetError(ctx: ParserRuleContext): Throwable = 
{
     new ParseException("Support for DEFAULT column values is not implemented 
yet", ctx)
   }
+
+  def defaultColumnNotEnabledError(ctx: ParserRuleContext): Throwable = {
+    new ParseException("Support for DEFAULT column values is not allowed", ctx)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1bba8b6..3e1872a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2805,6 +2805,17 @@ object SQLConf {
     .booleanConf
     .createWithDefault(sys.env.get("SPARK_ANSI_SQL_MODE").contains("true"))
 
+  val ENABLE_DEFAULT_COLUMNS =
+    buildConf("spark.sql.defaultColumn.enabled")
+      .internal()
+      .doc("When true, allow CREATE TABLE, REPLACE TABLE, and ALTER COLUMN 
statements to set or " +
+        "update default values for specific columns. Following INSERT, MERGE, 
and UPDATE " +
+        "statements may then omit these values and their values will be 
injected automatically " +
+        "instead.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val ENFORCE_RESERVED_KEYWORDS = 
buildConf("spark.sql.ansi.enforceReservedKeywords")
     .doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser 
enforces the ANSI " +
       "reserved keywords and forbids SQL queries that use reserved keywords as 
alias names " +
@@ -4305,6 +4316,8 @@ class SQLConf extends Serializable with Logging {
 
   def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
 
+  def enableDefaultColumns: Boolean = getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)
+
   def enforceReservedKeywords: Boolean = ansiEnabled && 
getConf(ENFORCE_RESERVED_KEYWORDS)
 
   def strictIndexOperator: Boolean = ansiEnabled && 
getConf(ANSI_STRICT_INDEX_OPERATOR)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index f791f77..d26501b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -27,10 +27,10 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, 
NoSuchDatabaseException, NoSuchFunctionException}
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, 
NoSuchDatabaseException, NoSuchFunctionException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -1025,16 +1025,44 @@ abstract class CatalogTestUtils {
 
   def newTable(name: String, db: String): CatalogTable = newTable(name, 
Some(db))
 
-  def newTable(name: String, database: Option[String] = None): CatalogTable = {
+  def newTable(
+      name: String,
+      database: Option[String] = None,
+      defaultColumns: Boolean = false): CatalogTable = {
     CatalogTable(
       identifier = TableIdentifier(name, database),
       tableType = CatalogTableType.EXTERNAL,
       storage = storageFormat.copy(locationUri = 
Some(Utils.createTempDir().toURI)),
-      schema = new StructType()
-        .add("col1", "int")
-        .add("col2", "string")
-        .add("a", "int")
-        .add("b", "string"),
+      schema = if (defaultColumns) {
+        new StructType()
+          .add("col1", "int")
+          .add("col2", "string")
+          .add("a", IntegerType, nullable = true,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
"42").build())
+          .add("b", StringType, nullable = false,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
"\"abc\"").build())
+          // The default value fails to parse.
+          .add("c", LongType, nullable = false,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
"_@#$%").build())
+          // The default value fails to resolve.
+          .add("d", LongType, nullable = false,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
+              "(select min(x) from badtable)").build())
+          // The default value fails to coerce to the required type.
+          .add("e", BooleanType, nullable = false,
+            new MetadataBuilder().putString(
+              ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "41 + 
1").build())
+      } else {
+        new StructType()
+          .add("col1", "int")
+          .add("col2", "string")
+          .add("a", "int")
+          .add("b", "string")
+      },
       provider = Some(defaultProvider),
       partitionColumnNames = Seq("a", "b"),
       bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index b134085..8769a6f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LogicalPlan, 
Project, Range, SubqueryAlias, View}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -120,6 +121,67 @@ abstract class SessionCatalogSuite extends AnalysisTest 
with Eventually {
     assert(e.contains(s"`$name` is not a valid name for tables/databases."))
   }
 
+  test("create table with default columns") {
+    withBasicCatalog { catalog =>
+      assert(catalog.externalCatalog.listTables("db1").isEmpty)
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", 
"tbl2"))
+      catalog.createTable(newTable(
+        "tbl3", Some("db1"), defaultColumns = true), ignoreIfExists = false)
+      catalog.createTable(newTable(
+        "tbl3", Some("db2"), defaultColumns = true), ignoreIfExists = false)
+      assert(catalog.externalCatalog.listTables("db1").toSet == Set("tbl3"))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", 
"tbl2", "tbl3"))
+      // Inspect the default column values.
+      val db1tbl3 = catalog.externalCatalog.getTable("db1", "tbl3")
+      val currentDefault = 
ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY
+
+      def findField(name: String, schema: StructType): StructField =
+        schema.fields.filter(_.name == name).head
+      val columnA: StructField = findField("a", db1tbl3.schema)
+      val columnB: StructField = findField("b", db1tbl3.schema)
+      val columnC: StructField = findField("c", db1tbl3.schema)
+      val columnD: StructField = findField("d", db1tbl3.schema)
+      val columnE: StructField = findField("e", db1tbl3.schema)
+
+      val defaultValueColumnA: String = 
columnA.metadata.getString(currentDefault)
+      val defaultValueColumnB: String = 
columnB.metadata.getString(currentDefault)
+      val defaultValueColumnC: String = 
columnC.metadata.getString(currentDefault)
+      val defaultValueColumnD: String = 
columnD.metadata.getString(currentDefault)
+      val defaultValueColumnE: String = 
columnE.metadata.getString(currentDefault)
+
+      assert(defaultValueColumnA == "42")
+      assert(defaultValueColumnB == "\"abc\"")
+      assert(defaultValueColumnC == "_@#$%")
+      assert(defaultValueColumnD == "(select min(x) from badtable)")
+      assert(defaultValueColumnE == "41 + 1")
+
+      // Analyze the default column values.
+      val analyzer = new Analyzer(new SessionCatalog(new InMemoryCatalog, 
FunctionRegistry.builtin))
+      val statementType = "CREATE TABLE"
+      assert(ResolveDefaultColumns.analyze(analyzer, columnA, 
statementType).sql == "42")
+      assert(ResolveDefaultColumns.analyze(analyzer, columnB, 
statementType).sql == "'abc'")
+      assert(intercept[AnalysisException] {
+        ResolveDefaultColumns.analyze(analyzer, columnC, statementType)
+      }.getMessage.contains("fails to parse as a valid expression"))
+      assert(intercept[AnalysisException] {
+        ResolveDefaultColumns.analyze(analyzer, columnD, statementType)
+      }.getMessage.contains("fails to resolve as a valid expression"))
+      assert(intercept[AnalysisException] {
+        ResolveDefaultColumns.analyze(analyzer, columnE, statementType)
+      }.getMessage.contains("statement provided a value of incompatible type"))
+
+      // Make sure that constant-folding default values does not take place 
when the feature is
+      // disabled.
+      withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+        val result: StructType = 
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
+          analyzer, db1tbl3.schema, "CREATE TABLE")
+        val columnEWithFeatureDisabled: StructField = findField("e", result)
+        // No constant-folding has taken place to the EXISTS_DEFAULT metadata.
+        assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT"))
+      }
+    }
+  }
+
   test("create databases using invalid names") {
     withEmptyCatalog { catalog =>
       testInvalidName(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 493e6b7..e6ae073 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -22,12 +22,13 @@ import java.util.Locale
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
-import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.{TableSpec => 
LogicalTableSpec, _}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import 
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, FieldReference, HoursTransform, 
IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
 import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructType, TimestampType}
+import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, 
StringType, StructType, TimestampType}
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 class DDLParserSuite extends AnalysisTest {
@@ -2240,20 +2241,45 @@ class DDLParserSuite extends AnalysisTest {
   }
 
   test("SPARK-38335: Implement parser support for DEFAULT values for columns 
in tables") {
-    // The following commands will support DEFAULT columns, but this has not 
been implemented yet.
-    for (sql <- Seq(
-      "ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42",
-      "ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42",
-      "ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT",
-      "ALTER TABLE t1 REPLACE COLUMNS (x STRING DEFAULT 42)",
-      "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL DEFAULT 
\"abc\") USING parquet",
-      "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL DEFAULT 
\"xyz\") USING parquet"
-    )) {
-      val exc = intercept[ParseException] {
-        parsePlan(sql);
-      }
-      assert(exc.getMessage.contains("Support for DEFAULT column values is not 
implemented yet"));
+    // The following ALTER TABLE commands will support DEFAULT columns, but 
this has not been
+    // implemented yet.
+    val unsupportedError = "Support for DEFAULT column values is not 
implemented yet"
+    assert(intercept[ParseException] {
+      parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42")
+    }.getMessage.contains(unsupportedError))
+    assert(intercept[ParseException] {
+      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42")
+    }.getMessage.contains(unsupportedError))
+    assert(intercept[ParseException] {
+      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT")
+    }.getMessage.contains(unsupportedError))
+    assert(intercept[ParseException] {
+      parsePlan("ALTER TABLE t1 REPLACE COLUMNS (x STRING DEFAULT 42)")
+    }.getMessage.contains(unsupportedError))
+    // These CREATE/REPLACE TABLE statements should parse successfully.
+    val schemaWithDefaultColumn = new StructType()
+      .add("a", IntegerType, true)
+      .add("b", StringType, false,
+        new MetadataBuilder()
+          
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"")
+          .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"\"abc\"").build())
+    comparePlans(parsePlan(
+      "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING 
parquet"),
+      CreateTable(UnresolvedDBObjectName(Seq("my_tab"), false), 
schemaWithDefaultColumn,
+        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], 
Some("parquet"),
+          Map.empty[String, String], None, None, None, false), false))
+    comparePlans(parsePlan("REPLACE TABLE my_tab(a INT, " +
+      "b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
+      ReplaceTable(UnresolvedDBObjectName(Seq("my_tab"), false), 
schemaWithDefaultColumn,
+        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], 
Some("parquet"),
+          Map.empty[String, String], None, None, None, false), false))
+    // Make sure that the parser returns an exception when the feature is 
disabled.
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+      intercept(
+        "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING 
parquet",
+        "Support for DEFAULT column values is not allowed")
     }
+
     // In each of the following cases, the DEFAULT reference parses as an 
unresolved attribute
     // reference. We can handle these cases after the parsing stage, at later 
phases of analysis.
     comparePlans(parsePlan("VALUES (1, 2, DEFAULT) AS val"),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index c0b00a4..d179079 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.{toPrettySQL, V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns, 
V2ExpressionBuilder}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, 
SupportsWrite, Table, TableCapability, TableCatalog}
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference}
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.StorageLevel
 
@@ -168,7 +169,10 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
 
     case CreateTable(ResolvedDBObjectName(catalog, ident), schema, 
partitioning,
         tableSpec, ifNotExists) =>
-      CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema,
+      val newSchema: StructType =
+        ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
+          session.sessionState.analyzer, schema, "CREATE TABLE")
+      CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, newSchema,
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
     case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, 
query, tableSpec,
@@ -187,12 +191,15 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
 
     case ReplaceTable(ResolvedDBObjectName(catalog, ident), schema, parts, 
tableSpec, orCreate) =>
+      val newSchema: StructType =
+        ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
+          session.sessionState.analyzer, schema, "CREATE TABLE")
       catalog match {
         case staging: StagingTableCatalog =>
-          AtomicReplaceTableExec(staging, ident.asIdentifier, schema, parts,
+          AtomicReplaceTableExec(staging, ident.asIdentifier, newSchema, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, 
invalidateCache) :: Nil
         case _ =>
-          ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, 
parts,
+          ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, 
newSchema, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, 
invalidateCache) :: Nil
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index b64ed08..6f14b1f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, 
ResolveDefaultColumns}
 import org.apache.spark.sql.connector.catalog._
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
@@ -614,15 +614,21 @@ class DataSourceV2SQLSuite
     val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
     assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
 
-    spark.sql("REPLACE TABLE testcat.table_name (id bigint NOT NULL) USING 
foo")
+    spark.sql("REPLACE TABLE testcat.table_name (id bigint NOT NULL DEFAULT 41 
+ 1) USING foo")
     val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
 
     assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
         "Replaced table should have no rows after committing.")
     assert(replaced.schema().fields.length === 1,
         "Replaced table should have new schema.")
-    assert(replaced.schema().fields(0) === StructField("id", LongType, 
nullable = false),
-      "Replaced table should have new schema.")
+    val actual = replaced.schema().fields(0)
+    val expected = StructField("id", LongType, nullable = false,
+      new MetadataBuilder().putString(
+        ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "41 + 1")
+        .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
"CAST(42 AS BIGINT)")
+        .build())
+    assert(actual === expected,
+      "Replaced table should have new schema with DEFAULT column metadata.")
   }
 
   test("ReplaceTableAsSelect: CREATE OR REPLACE new table has same behavior as 
CTAS.") {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 9e29386..4dc08a0e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -869,25 +869,25 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
             |SORTED BY (s1)
             |INTO 200 BUCKETS
             |STORED AS PARQUET
-          """.stripMargin
+        """.stripMargin
         } else {
           """
-             |CREATE TABLE test1(
-             |v1 BIGINT,
-             |s1 INT)
-             |USING PARQUET
-             |PARTITIONED BY (pk BIGINT)
-             |CLUSTERED BY (v1)
-             |SORTED BY (s1)
-             |INTO 200 BUCKETS
-          """.stripMargin
+            |CREATE TABLE test1(
+            |v1 BIGINT,
+            |s1 INT)
+            |USING PARQUET
+            |PARTITIONED BY (pk BIGINT)
+            |CLUSTERED BY (v1)
+            |SORTED BY (s1)
+            |INTO 200 BUCKETS
+        """.stripMargin
         }
 
         val insertString =
           """
             |INSERT INTO test1
             |SELECT * FROM VALUES(1,1,1)
-          """.stripMargin
+        """.stripMargin
 
         val dropString = "DROP TABLE IF EXISTS test1"
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to