This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a364cc0  [SPARK-38336][SQL] Support INSERT INTO commands into tables 
with DEFAULT columns
a364cc0 is described below

commit a364cc01716279d19eb5b43bde4961b04f5102af
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Thu Mar 31 14:29:58 2022 +0800

    [SPARK-38336][SQL] Support INSERT INTO commands into tables with DEFAULT 
columns
    
    ### What changes were proposed in this pull request?
    
    Extend INSERT INTO statements to support omitting default values or 
referring to them explicitly with the DEFAULT keyword, in which case the Spark 
analyzer will automatically insert the appropriate corresponding values in the 
right places.
    
    Example:
    ```
    CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
    INSERT INTO T VALUES (1);
    INSERT INTO T VALUES (1, DEFAULT);
    INSERT INTO T VALUES (DEFAULT, 6);
    SELECT * FROM T;
    (1, 5)
    (1, 5)
    (4, 6)
    ```
    
    ### Why are the changes needed?
    
    This helps users issue INSERT INTO statements with less effort, and helps 
people creating or updating tables to add custom optional columns for use in 
specific circumstances as desired.
    
    ### How was this patch tested?
    
    This change is covered by new and existing unit test coverage as well as 
new INSERT INTO query test cases covering a variety of positive and negative 
scenarios.
    
    Closes #35982 from dtenedor/default-columns-insert-into.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   1 +
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   1 +
 .../catalyst/analysis/ResolveDefaultColumns.scala  | 250 ++++++++++++++++++++
 .../spark/sql/catalyst/parser/AstBuilder.scala     |   5 +
 .../sql/catalyst/rules/RuleIdCollection.scala      |   1 +
 ...lumns.scala => ResolveDefaultColumnsUtil.scala} |  52 +++--
 .../spark/sql/errors/QueryParsingErrors.scala      |   5 +
 .../org/apache/spark/sql/internal/SQLConf.scala    |  14 ++
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  |  30 ++-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 258 ++++++++++++++++++++-
 .../org/apache/spark/sql/hive/InsertSuite.scala    |  91 ++++----
 11 files changed, 638 insertions(+), 70 deletions(-)

diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 17c3395..872ea53 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -322,6 +322,7 @@ partitionSpec
 
 partitionVal
     : identifier (EQ constant)?
+    | identifier EQ DEFAULT
     ;
 
 namespace
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f69f17d..bd437c3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -311,6 +311,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       ResolveAggregateFunctions ::
       TimeWindowing ::
       SessionWindowing ::
+      ResolveDefaultColumns(this, v1SessionCatalog) ::
       ResolveInlineTables ::
       ResolveLambdaVariables ::
       ResolveTimeZone ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
new file mode 100644
index 0000000..f4502c9
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, 
UnresolvedCatalogRelation}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This is a rule to process DEFAULT columns in statements such as 
CREATE/REPLACE TABLE.
+ *
+ * Background: CREATE TABLE and ALTER TABLE invocations support setting column 
default values for
+ * later operations. Following INSERT, and INSERT MERGE commands may then 
reference the value
+ * using the DEFAULT keyword as needed.
+ *
+ * Example:
+ * CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
+ * INSERT INTO T VALUES (1, 2);
+ * INSERT INTO T VALUES (1, DEFAULT);
+ * INSERT INTO T VALUES (DEFAULT, 6);
+ * SELECT * FROM T;
+ * (1, 2)
+ * (1, 5)
+ * (4, 6)
+ *
+ * @param analyzer analyzer to use for processing DEFAULT values stored as 
text.
+ * @param catalog  the catalog to use for looking up the schema of INSERT INTO 
table objects.
+ */
+case class ResolveDefaultColumns(
+  analyzer: Analyzer,
+  catalog: SessionCatalog) extends Rule[LogicalPlan] {
+
+  // This field stores the enclosing INSERT INTO command, once we find one.
+  var enclosingInsert: Option[InsertIntoStatement] = None
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+    (_ => SQLConf.get.enableDefaultColumns), ruleId) {
+    case i@InsertIntoStatement(_, _, _, _, _, _)
+      if i.query.collectFirst { case u: UnresolvedInlineTable => u }.isDefined 
=>
+      enclosingInsert = Some(i)
+      i
+
+    case table: UnresolvedInlineTable
+      if enclosingInsert.isDefined &&
+        table.rows.nonEmpty && table.rows.forall(_.size == table.rows(0).size) 
=>
+      val expanded: UnresolvedInlineTable = 
addMissingDefaultColumnValues(table).getOrElse(table)
+      replaceExplicitDefaultColumnValues(analyzer, expanded).getOrElse(table)
+
+    case i@InsertIntoStatement(_, _, _, project: Project, _, _) =>
+      enclosingInsert = Some(i)
+      val expanded: Project = 
addMissingDefaultColumnValues(project).getOrElse(project)
+      val replaced: Option[LogicalPlan] = 
replaceExplicitDefaultColumnValues(analyzer, expanded)
+      if (replaced.isDefined) i.copy(query = replaced.get) else i
+  }
+
+  // Helper method to check if an expression is an explicit DEFAULT column 
reference.
+  private def isExplicitDefaultColumn(expr: Expression): Boolean = expr match {
+    case u: UnresolvedAttribute if 
u.name.equalsIgnoreCase(CURRENT_DEFAULT_COLUMN_NAME) => true
+    case _ => false
+  }
+
+  /**
+   * Updates an inline table to generate missing default column values.
+   */
+  private def addMissingDefaultColumnValues(
+    table: UnresolvedInlineTable): Option[UnresolvedInlineTable] = {
+    assert(enclosingInsert.isDefined)
+    val numQueryOutputs: Int = table.rows(0).size
+    val schema = getInsertTableSchemaWithoutPartitionColumns.getOrElse(return 
None)
+    val newDefaultExpressions: Seq[Expression] = 
getDefaultExpressions(numQueryOutputs, schema)
+    val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map { 
_.name }
+    if (newDefaultExpressions.nonEmpty) {
+      Some(table.copy(
+        names = table.names ++ newNames,
+        rows = table.rows.map { row => row ++ newDefaultExpressions }))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Adds a new expressions to a projection to generate missing default column 
values.
+   */
+  private def addMissingDefaultColumnValues(project: Project): Option[Project] 
= {
+    val numQueryOutputs: Int = project.projectList.size
+    val schema = getInsertTableSchemaWithoutPartitionColumns.getOrElse(return 
None)
+    val newDefaultExpressions: Seq[Expression] = 
getDefaultExpressions(numQueryOutputs, schema)
+    val newAliases: Seq[NamedExpression] =
+      newDefaultExpressions.zip(schema.fields).map {
+        case (expr, field) => Alias(expr, field.name)()
+      }
+    if (newDefaultExpressions.nonEmpty) {
+      Some(project.copy(projectList = project.projectList ++ newAliases))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * This is a helper for the addMissingDefaultColumnValues methods above.
+   */
+  private def getDefaultExpressions(numQueryOutputs: Int, schema: StructType): 
Seq[Expression] = {
+    val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
+    val numDefaultExpressionsToAdd: Int = {
+      if (SQLConf.get.useNullsForMissingDefaultColumnValues) {
+        remainingFields.size
+      } else {
+        
remainingFields.takeWhile(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)).size
+      }
+    }
+    
Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
+  }
+
+  /**
+   * Replaces unresolved DEFAULT column references with corresponding values 
in a logical plan.
+   */
+  private def replaceExplicitDefaultColumnValues(
+    analyzer: Analyzer,
+    input: LogicalPlan): Option[LogicalPlan] = {
+    assert(enclosingInsert.isDefined)
+    val schema = getInsertTableSchemaWithoutPartitionColumns.getOrElse(return 
None)
+    val columnNames: Seq[String] = schema.fields.map { _.name }
+    val defaultExpressions: Seq[Expression] = schema.fields.map {
+      case f if f.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
+        analyze(analyzer, f, "INSERT")
+      case _ => Literal(null)
+    }
+    // Check the type of `input` and replace its expressions accordingly.
+    // If necessary, return a more descriptive error message if the user tries 
to nest the DEFAULT
+    // column reference inside some other expression, such as DEFAULT + 1 
(this is not allowed).
+    //
+    // Note that we don't need to check if 
"SQLConf.get.useNullsForMissingDefaultColumnValues"
+    // after this point because this method only takes responsibility to 
replace *existing*
+    // DEFAULT references. In contrast, the "getDefaultExpressions" method 
will check that config
+    // and add new NULLs if needed.
+    input match {
+      case table: UnresolvedInlineTable =>
+        replaceExplicitDefaultColumnValues(defaultExpressions, table)
+      case project: Project =>
+        replaceExplicitDefaultColumnValues(defaultExpressions, columnNames, 
project)
+    }
+  }
+
+  /**
+   * Replaces unresolved DEFAULT column references with corresponding values 
in an inline table.
+   */
+  private def replaceExplicitDefaultColumnValues(
+    defaultExpressions: Seq[Expression],
+    table: UnresolvedInlineTable): Option[LogicalPlan] = {
+    var replaced = false
+    val newRows: Seq[Seq[Expression]] = {
+      table.rows.map { row: Seq[Expression] =>
+        for {
+          i <- 0 until row.size
+          expr = row(i)
+          defaultExpr = if (i < defaultExpressions.size) defaultExpressions(i) 
else Literal(null)
+        } yield expr match {
+          case u: UnresolvedAttribute if isExplicitDefaultColumn(u) =>
+            replaced = true
+            defaultExpr
+          case expr@_ if expr.find { isExplicitDefaultColumn }.isDefined =>
+            throw new AnalysisException(DEFAULTS_IN_EXPRESSIONS_ERROR)
+          case _ => expr
+        }
+      }
+    }
+    if (replaced) {
+      Some(table.copy(rows = newRows))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Replaces unresolved DEFAULT column references with corresponding values 
in an projection.
+   */
+  private def replaceExplicitDefaultColumnValues(
+    defaultExpressions: Seq[Expression],
+    colNames: Seq[String],
+    project: Project): Option[LogicalPlan] = {
+    var replaced = false
+    val updated: Seq[NamedExpression] = {
+      for {
+        i <- 0 until project.projectList.size
+        projectExpr = project.projectList(i)
+        defaultExpr = if (i < defaultExpressions.size) defaultExpressions(i) 
else Literal(null)
+        colName = if (i < colNames.size) colNames(i) else ""
+      } yield projectExpr match {
+        case Alias(u: UnresolvedAttribute, _) if isExplicitDefaultColumn(u) =>
+          replaced = true
+          Alias(defaultExpr, colName)()
+        case u: UnresolvedAttribute if isExplicitDefaultColumn(u) =>
+          replaced = true
+          Alias(defaultExpr, colName)()
+        case expr@_ if expr.find { isExplicitDefaultColumn }.isDefined =>
+          throw new AnalysisException(DEFAULTS_IN_EXPRESSIONS_ERROR)
+        case _ => projectExpr
+      }
+    }
+    if (replaced) {
+      Some(project.copy(projectList = updated))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Looks up the schema for the table object of an INSERT INTO statement from 
the catalog.
+   */
+  private def getInsertTableSchemaWithoutPartitionColumns: Option[StructType] 
= {
+    assert(enclosingInsert.isDefined)
+    try {
+      val tableName = enclosingInsert.get.table match {
+        case r: UnresolvedRelation => TableIdentifier(r.name)
+        case r: UnresolvedCatalogRelation => r.tableMeta.identifier
+        case _ => return None
+      }
+      val lookup = catalog.lookupRelation(tableName)
+      lookup match {
+        case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
+          Some(StructType(r.tableMeta.schema.fields.dropRight(
+            enclosingInsert.get.partitionSpec.size)))
+        case _ => None
+      }
+    } catch {
+      case _: NoSuchTableException => None
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index c7925c9..c435e31 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -480,6 +480,11 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
     val legacyNullAsString =
       conf.getConf(SQLConf.LEGACY_PARSE_NULL_PARTITION_SPEC_AS_STRING_LITERAL)
     val parts = ctx.partitionVal.asScala.map { pVal =>
+      // Check if the query attempted to refer to a DEFAULT column value 
within the PARTITION clause
+      // and return a specific error to help guide the user, since this is not 
allowed.
+      if (pVal.DEFAULT != null) {
+        throw 
QueryParsingErrors.defaultColumnReferencesNotAllowedInPartitionSpec(ctx)
+      }
       val name = pVal.identifier.getText
       val value = Option(pVal.constant).map(v => visitStringConstant(v, 
legacyNullAsString))
       name -> value
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index e36a76b..1204fa8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -81,6 +81,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.DeduplicateRelations" ::
       "org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases" ::
       "org.apache.spark.sql.catalyst.analysis.EliminateUnions" ::
+      "org.apache.spark.sql.catalyst.analysis.ResolveDefaultColumns" ::
       
"org.apache.spark.sql.catalyst.analysis.ResolveExpressionsWithNamePlaceholders" 
::
       
"org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveCoalesceHints" ::
       
"org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveJoinStrategyHints" 
::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
similarity index 78%
rename from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
rename to 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 1e40756..b7db9be 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -38,20 +38,28 @@ object ResolveDefaultColumns {
   val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
   // This column metadata represents the default value for all existing rows 
in a table after a
   // column has been added. This value is determined at time of CREATE TABLE, 
REPLACE TABLE, or
-  // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for 
this "exist default"
-  // to be used by any scan when the columns in the source row are missing 
data. For example,
-  // consider the following sequence:
+  // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for 
this "exist default" to
+  // be used by any scan when the columns in the source row are missing data. 
For example, consider
+  // the following sequence:
   // CREATE TABLE t (c1 INT)
   // INSERT INTO t VALUES (42)
   // ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
   // SELECT c1, c2 FROM t
   // In this case, the final query is expected to return 42, 43. The ALTER 
TABLE ADD COLUMNS command
-  // executed after there was already data in the table, so in order to 
enforce this invariant,
-  // we need either (1) an expensive backfill of value 43 at column c2 into 
all previous rows, or
-  // (2) indicate to each data source that selected columns missing data are 
to generate the
+  // executed after there was already data in the table, so in order to 
enforce this invariant, we
+  // need either (1) an expensive backfill of value 43 at column c2 into all 
previous rows, or (2)
+  // indicate to each data source that selected columns missing data are to 
generate the
   // corresponding DEFAULT value instead. We choose option (2) for efficiency, 
and represent this
   // value as the text representation of a folded constant in the 
"EXISTS_DEFAULT" column metadata.
   val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
+  // Name of attributes representing explicit references to the value stored 
in the above
+  // CURRENT_DEFAULT_COLUMN_METADATA.
+  val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
+  // Return a more descriptive error message if the user tries to nest the 
DEFAULT column reference
+  // inside some other expression, such as DEFAULT + 1 (this is not allowed).
+  val DEFAULTS_IN_EXPRESSIONS_ERROR = "Failed to execute INSERT INTO command 
because the " +
+    "VALUES list contains a DEFAULT column reference as part of another 
expression; this is " +
+    "not allowed"
 
   /**
    * Finds "current default" expressions in CREATE/REPLACE TABLE columns and 
constant-folds them.
@@ -75,8 +83,8 @@ object ResolveDefaultColumns {
    * data source then takes responsibility to provide the constant-folded 
value in the
    * EXISTS_DEFAULT metadata for such columns where the value is not present 
in storage.
    *
-   * @param analyzer used for analyzing the result of parsing the column 
expression stored as text.
-   * @param tableSchema represents the names and types of the columns of the 
statement to process.
+   * @param analyzer      used for analyzing the result of parsing the 
expression stored as text.
+   * @param tableSchema   represents the names and types of the columns of the 
statement to process.
    * @param statementType name of the statement being processed, such as 
INSERT; useful for errors.
    * @return a copy of `tableSchema` with field metadata updated with the 
constant-folded values.
    */
@@ -84,26 +92,28 @@ object ResolveDefaultColumns {
       analyzer: Analyzer,
       tableSchema: StructType,
       statementType: String): StructType = {
-    if (!SQLConf.get.enableDefaultColumns) {
-      return tableSchema
-    }
-    val newFields: Seq[StructField] = tableSchema.fields.map { field =>
-      if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
-        val analyzed: Expression = analyze(analyzer, field, statementType)
-        val newMetadata: Metadata = new 
MetadataBuilder().withMetadata(field.metadata)
-          .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, analyzed.sql).build()
-        field.copy(metadata = newMetadata)
-      } else {
-        field
+    if (SQLConf.get.enableDefaultColumns) {
+      val newFields: Seq[StructField] = tableSchema.fields.map { field =>
+        if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+          val analyzed: Expression = analyze(analyzer, field, statementType)
+          val newMetadata: Metadata = new 
MetadataBuilder().withMetadata(field.metadata)
+            .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
analyzed.sql).build()
+          field.copy(metadata = newMetadata)
+        } else {
+          field
+        }
       }
+      StructType(newFields)
+    } else {
+      tableSchema
     }
-    StructType(newFields)
   }
 
   /**
    * Parses and analyzes the DEFAULT column text in `field`, returning an 
error upon failure.
    *
-   * @param field represents the DEFAULT column value whose "default" metadata 
to parse and analyze.
+   * @param field         represents the DEFAULT column value whose "default" 
metadata to parse
+   *                      and analyze.
    * @param statementType which type of statement we are running, such as 
INSERT; useful for errors.
    * @return Result of the analysis and constant-folding operation.
    */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index b13a530..69e118d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -452,4 +452,9 @@ object QueryParsingErrors {
   def defaultColumnNotEnabledError(ctx: ParserRuleContext): Throwable = {
     new ParseException("Support for DEFAULT column values is not allowed", ctx)
   }
+
+  def defaultColumnReferencesNotAllowedInPartitionSpec(ctx: 
ParserRuleContext): Throwable = {
+    new ParseException(
+      "References to DEFAULT column values are not allowed within the 
PARTITION clause", ctx)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3e1872a..9aad649 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2816,6 +2816,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES =
+    buildConf("spark.sql.defaultColumn.useNullsForMissingDefautValues")
+      .internal()
+      .doc("When true, and DEFAULT columns are enabled, allow column 
definitions lacking " +
+        "explicit default values to behave as if they had specified DEFAULT 
NULL instead. " +
+        "For example, this allows most INSERT INTO statements to specify only 
a prefix of the " +
+        "columns in the target table, and the remaining columns will receive 
NULL values.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val ENFORCE_RESERVED_KEYWORDS = 
buildConf("spark.sql.ansi.enforceReservedKeywords")
     .doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser 
enforces the ANSI " +
       "reserved keywords and forbids SQL queries that use reserved keywords as 
alias names " +
@@ -4318,6 +4329,9 @@ class SQLConf extends Serializable with Logging {
 
   def enableDefaultColumns: Boolean = getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)
 
+  def useNullsForMissingDefaultColumnValues: Boolean =
+    getConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES)
+
   def enforceReservedKeywords: Boolean = ansiEnabled && 
getConf(ENFORCE_RESERVED_KEYWORDS)
 
   def strictIndexOperator: Boolean = ansiEnabled && 
getConf(ANSI_STRICT_INDEX_OPERATOR)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index fad01db..274da57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -171,25 +171,36 @@ trait SQLInsertTestSuite extends QueryTest with 
SQLTestUtils {
   }
 
   test("insert with column list - mismatched column list size") {
-    val msg = "Cannot write to table due to mismatched user specified column 
size"
-    withTable("t1") {
-      val cols = Seq("c1", "c2", "c3")
-      createTable("t1", cols, Seq("int", "long", "string"))
-      val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2) 
values(1, 2, 3)"))
-      assert(e1.getMessage.contains(msg))
-      val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c3) 
values(1, 2)"))
-      assert(e2.getMessage.contains(msg))
+    val msgs = Seq("Cannot write to table due to mismatched user specified 
column size",
+      "expected 3 columns but found")
+    def test: Unit = {
+      withTable("t1") {
+        val cols = Seq("c1", "c2", "c3")
+        createTable("t1", cols, Seq("int", "long", "string"))
+        val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2) 
values(1, 2, 3)"))
+        assert(e1.getMessage.contains(msgs(0)) || 
e1.getMessage.contains(msgs(1)))
+        val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, 
c3) values(1, 2)"))
+        assert(e2.getMessage.contains(msgs(0)) || 
e2.getMessage.contains(msgs(1)))
+      }
+    }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+      test
+    }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+      test
     }
   }
 
   test("insert with column list - mismatched target table out size after 
rewritten query") {
-    val v2Msg = "Cannot write to 'testcat.t1', not enough data columns:"
+    val v2Msg = "expected 2 columns but found"
     val cols = Seq("c1", "c2", "c3", "c4")
 
     withTable("t1") {
       createTable("t1", cols, Seq.fill(4)("int"))
       val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) 
values(1)"))
       assert(e1.getMessage.contains("target table has 4 column(s) but the 
inserted data has 1") ||
+        e1.getMessage.contains("expected 4 columns but found 1") ||
+        e1.getMessage.contains("not enough data columns") ||
         e1.getMessage.contains(v2Msg))
     }
 
@@ -199,6 +210,7 @@ trait SQLInsertTestSuite extends QueryTest with 
SQLTestUtils {
         sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)")
       }
       assert(e1.getMessage.contains("target table has 4 column(s) but the 
inserted data has 3") ||
+        e1.getMessage.contains("not enough data columns") ||
         e1.getMessage.contains(v2Msg))
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 1fb4737..2483055 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -27,7 +27,9 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
@@ -405,7 +407,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
   }
 
   test("SPARK-15824 - Execute an INSERT wrapped in a WITH statement 
immediately") {
-    withTable("target", "target2") {
+    def test: Unit = withTable("target", "target2") {
       sql(s"CREATE TABLE target(a INT, b STRING) USING JSON")
       sql("WITH tbl AS (SELECT * FROM jt) INSERT OVERWRITE TABLE target SELECT 
a, b FROM tbl")
       checkAnswer(
@@ -426,6 +428,12 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
         sql("SELECT a, b FROM jt")
       )
     }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+      test
+    }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+      test
+    }
   }
 
   test("SPARK-21203 wrong results of insertion of Array of Struct") {
@@ -849,6 +857,254 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("INSERT INTO statements with tables with default columns: positive 
tests") {
+    // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is 
enabled, and no
+    // explicit DEFAULT value is available when the INSERT INTO statement 
provides fewer
+    // values than expected, NULL values are appended in their place.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"true") {
+      withTable("t") {
+        sql("create table t(i boolean, s bigint) using parquet")
+        sql("insert into t values(true)")
+        checkAnswer(sql("select s from t where i = true"), Seq(Row(null)))
+      }
+    }
+    // The default value for the DEFAULT keyword is the NULL literal.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint) using parquet")
+      sql("insert into t values(true, default)")
+      checkAnswer(sql("select s from t where i = true"), Seq(null).map(i => 
Row(i)))
+    }
+    // There is a complex expression in the default value.
+    withTable("t") {
+      sql("create table t(i boolean, s string default concat('abc', 'def')) 
using parquet")
+      sql("insert into t values(true, default)")
+      checkAnswer(sql("select s from t where i = true"), Seq("abcdef").map(i 
=> Row(i)))
+    }
+    // The default value parses correctly and the provided value type is 
different but coercible.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t values(false)")
+      checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i => 
Row(i)))
+    }
+    // There are two trailing default values referenced implicitly by the 
INSERT INTO statement.
+    withTable("t") {
+      sql("create table t(i int, s bigint default 42, x bigint default 43) 
using parquet")
+      sql("insert into t values(1)")
+      checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i => 
Row(i)))
+    }
+    // The table has a partitioning column and a default value is injected.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint, q int default 42 ) using 
parquet partitioned by (i)")
+      sql("insert into t partition(i='true') values(5, default)")
+      checkAnswer(sql("select s from t where i = true"), Seq(5).map(i => 
Row(i)))
+    }
+    // The table has a partitioning column and a default value is added per an 
explicit reference.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet 
partitioned by (i)")
+      sql("insert into t partition(i='true') values(default)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))
+    }
+    // The default value parses correctly as a constant but non-literal 
expression.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 41 + 1) using parquet")
+      sql("insert into t values(false, default)")
+      checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i => 
Row(i)))
+    }
+    // Explicit defaults may appear in different positions within the inline 
table provided as input
+    // to the INSERT INTO statement.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint default 42) using 
parquet")
+      sql("insert into t values(false, default), (default, 42)")
+      checkAnswer(sql("select s from t where i = false"), Seq(42L, 42L).map(i 
=> Row(i)))
+    }
+    // There is an explicit default value provided in the INSERT INTO 
statement in the VALUES,
+    // with an alias over the VALUES.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t select * from values (false, default) as tab(col, 
other)")
+      checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i => 
Row(i)))
+    }
+    // The explicit default value arrives first before the other value.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint) using parquet")
+      sql("insert into t values (default, 43)")
+      checkAnswer(sql("select s from t where i = false"), Seq(43L).map(i => 
Row(i)))
+    }
+    // The 'create table' statement provides the default parameter first.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint) using parquet")
+      sql("insert into t values (default, 43)")
+      checkAnswer(sql("select s from t where i = false"), Seq(43L).map(i => 
Row(i)))
+    }
+    // The explicit default value is provided in the wrong order (first 
instead of second), but
+    // this is OK because the provided default value evaluates to literal NULL.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t values (default, 43)")
+      checkAnswer(sql("select s from t where i is null"), Seq(43L).map(i => 
Row(i)))
+    }
+    // There is an explicit default value provided in the INSERT INTO 
statement as a SELECT.
+    // This is supported.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t select false, default")
+      checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i => 
Row(i)))
+    }
+    // There is a complex query plan in the SELECT query in the INSERT INTO 
statement.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint default 42) using 
parquet")
+      sql("insert into t select col, count(*) from values (default, default) " 
+
+        "as tab(col, other) group by 1")
+      checkAnswer(sql("select s from t where i = false"), Seq(1).map(i => 
Row(i)))
+    }
+    // The explicit default reference resolves successfully with nested table 
subqueries.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint) using parquet")
+      sql("insert into t select * from (select * from values(default, 42))")
+      checkAnswer(sql("select s from t where i = false"), Seq(42L).map(i => 
Row(i)))
+    }
+    // There are three column types exercising various combinations of 
implicit and explicit
+    // default column value references in the 'insert into' statements. Note 
these tests depend on
+    // enabling the configuration to use NULLs for missing DEFAULT column 
values.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"true") {
+      withTable("t1", "t2") {
+        sql("create table t1(j int, s bigint default 42, x bigint default 43) 
using parquet")
+        sql("insert into t1 values(1)")
+        sql("insert into t1 values(2, default)")
+        sql("insert into t1 values(3, default, default)")
+        sql("insert into t1 values(4, 44)")
+        sql("insert into t1 values(5, 44, 45)")
+        sql("create table t2(j int, s bigint default 42, x bigint default 43) 
using parquet")
+        sql("insert into t2 select j from t1 where j = 1")
+        sql("insert into t2 select j, default from t1 where j = 2")
+        sql("insert into t2 select j, default, default from t1 where j = 3")
+        sql("insert into t2 select j, s from t1 where j = 4")
+        sql("insert into t2 select j, s, default from t1 where j = 5")
+        val resultSchema = new StructType()
+          .add("s", LongType, false)
+          .add("x", LongType, false)
+        checkAnswer(
+          sql("select j, s, x from t2 order by j, s, x"),
+          Seq(
+            new GenericRowWithSchema(Array(1, 42L, 43L), resultSchema),
+            new GenericRowWithSchema(Array(2, 42L, 43L), resultSchema),
+            new GenericRowWithSchema(Array(3, 42L, 43L), resultSchema),
+            new GenericRowWithSchema(Array(4, 44L, 43L), resultSchema),
+            new GenericRowWithSchema(Array(5, 44L, 43L), resultSchema)))
+      }
+    }
+  }
+
+  test("INSERT INTO statements with tables with default columns: negative 
tests") {
+    object Errors {
+      val COMMON_SUBSTRING = " has a DEFAULT value"
+      val COLUMN_DEFAULT_NOT_FOUND = "Column 'default' does not exist"
+      val BAD_SUBQUERY = "cannot evaluate expression scalarsubquery() in 
inline table definition"
+    }
+    // The default value fails to analyze.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default badvalue) using parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t values (default, default)")
+      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+    }
+    // The default value analyzes to a table not in the catalog.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default (select min(x) from 
badtable)) using parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t values (default, default)")
+      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+    }
+    // The default value parses but refers to a table from the catalog.
+    withTable("t", "other") {
+      sql("create table other(x string) using parquet")
+      sql("create table t(i boolean, s bigint default (select min(x) from 
other)) using parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t values (default, default)")
+      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+    }
+    // The default value has an explicit alias. It fails to evaluate when 
inlined into the VALUES
+    // list at the INSERT INTO time.
+    withTable("t") {
+      sql("create table t(i boolean default (select false as alias), s bigint) 
using parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t values (default, default)")
+      }.getMessage.contains(Errors.BAD_SUBQUERY))
+    }
+    // Explicit default values may not participate in complex expressions in 
the VALUES list.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t values(false, default + 1)")
+      
}.getMessage.contains(ResolveDefaultColumns.DEFAULTS_IN_EXPRESSIONS_ERROR))
+    }
+    // Explicit default values may not participate in complex expressions in 
the SELECT query.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t select false, default + 1")
+      
}.getMessage.contains(ResolveDefaultColumns.DEFAULTS_IN_EXPRESSIONS_ERROR))
+    }
+    // Explicit default values have a reasonable error path if the table is 
not found.
+    withTable("t") {
+      assert(intercept[AnalysisException] {
+        sql("insert into t values(false, default)")
+      }.getMessage.contains(Errors.COLUMN_DEFAULT_NOT_FOUND))
+    }
+    // The default value parses but the type is not coercible.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default false) using parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t values (default, default)")
+      }.getMessage.contains("provided a value of incompatible type"))
+    }
+    // The number of columns in the INSERT INTO statement is greater than the 
number of columns in
+    // the table.
+    withTable("t") {
+      sql("create table num_data(id int, val decimal(38,10)) using parquet")
+      sql("create table t(id1 int, int2 int, result decimal(38,10)) using 
parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * 
t2.val " +
+          "from num_data t1, num_data t2")
+      }.getMessage.contains(
+        "requires that the data to be inserted have the same number of columns 
as the target"))
+    }
+    // The default value is disabled per configuration.
+    withTable("t") {
+      withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+        assert(intercept[AnalysisException] {
+          sql("create table t(i boolean, s bigint default 42L) using parquet")
+        }.getMessage.contains("Support for DEFAULT column values is not 
allowed"))
+      }
+    }
+    // There is one trailing default value referenced implicitly by the INSERT 
INTO statement.
+    withTable("t") {
+      sql("create table t(i int, s bigint default 42, x bigint) using parquet")
+      assert(intercept[AnalysisException] {
+        sql("insert into t values(1)")
+      }.getMessage.contains("expected 3 columns but found"))
+    }
+    // The table has a partitioning column with a default value; this is not 
allowed.
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint, q int default 42 ) 
" +
+        "using parquet partitioned by (i)")
+      assert(intercept[ParseException] {
+        sql("insert into t partition(i=default) values(5, default)")
+      }.getMessage.contains(
+        "References to DEFAULT column values are not allowed within the 
PARTITION clause"))
+    }
+    // The configuration option to append missing NULL values to the end of 
the INSERT INTO
+    // statement is not enabled.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+      withTable("t") {
+        sql("create table t(i boolean, s bigint) using parquet")
+        assert(intercept[AnalysisException] {
+          sql("insert into t values(true)")
+        }.getMessage.contains("target table has 2 column(s) but the inserted 
data has 1 column(s)"))
+      }
+    }
+  }
+
   test("Stop task set if FileAlreadyExistsException was thrown") {
     Seq(true, false).foreach { fastFail =>
       withSQLConf("fs.file.impl" -> 
classOf[FileExistingTestFileSystem].getName,
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 4dc08a0e..a7148e9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -857,52 +857,65 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   }
 
   test("SPARK-35531: Insert data with different cases of bucket column") {
-    withTable("test1") {
-      Seq(true, false).foreach { isHiveTable =>
-        val createSpark = if (isHiveTable) {
-          """
-            |CREATE TABLE TEST1(
-            |v1 BIGINT,
-            |s1 INT)
-            |PARTITIONED BY (pk BIGINT)
-            |CLUSTERED BY (v1)
-            |SORTED BY (s1)
-            |INTO 200 BUCKETS
-            |STORED AS PARQUET
-        """.stripMargin
-        } else {
-          """
-            |CREATE TABLE test1(
-            |v1 BIGINT,
-            |s1 INT)
-            |USING PARQUET
-            |PARTITIONED BY (pk BIGINT)
-            |CLUSTERED BY (v1)
-            |SORTED BY (s1)
-            |INTO 200 BUCKETS
-        """.stripMargin
-        }
+    def testDefaultColumn: Unit = {
+      withTable("test1") {
+        Seq(true, false).foreach { isHiveTable =>
+          val createSpark = if (isHiveTable) {
+            """
+              |CREATE TABLE TEST1(
+              |v1 BIGINT,
+              |s1 INT)
+              |PARTITIONED BY (pk BIGINT)
+              |CLUSTERED BY (v1)
+              |SORTED BY (s1)
+              |INTO 200 BUCKETS
+              |STORED AS PARQUET
+          """.stripMargin
+          } else {
+            """
+              |CREATE TABLE test1(
+              |v1 BIGINT,
+              |s1 INT)
+              |USING PARQUET
+              |PARTITIONED BY (pk BIGINT)
+              |CLUSTERED BY (v1)
+              |SORTED BY (s1)
+              |INTO 200 BUCKETS
+          """.stripMargin
+          }
 
-        val insertString =
-          """
-            |INSERT INTO test1
-            |SELECT * FROM VALUES(1,1,1)
-        """.stripMargin
+          val insertString =
+            """
+              |INSERT INTO test1
+              |SELECT * FROM VALUES(1,1,1)
+          """.stripMargin
 
-        val dropString = "DROP TABLE IF EXISTS test1"
+          val dropString = "DROP TABLE IF EXISTS test1"
 
-        sql(dropString)
-        sql(createSpark.toLowerCase(Locale.ROOT))
+          sql(dropString)
+          sql(createSpark.toLowerCase(Locale.ROOT))
 
-        sql(insertString.toLowerCase(Locale.ROOT))
-        sql(insertString.toUpperCase(Locale.ROOT))
+          sql(insertString.toLowerCase(Locale.ROOT))
+          sql(insertString.toUpperCase(Locale.ROOT))
 
-        sql(dropString)
-        sql(createSpark.toUpperCase(Locale.ROOT))
+          sql(dropString)
+          sql(createSpark.toUpperCase(Locale.ROOT))
 
-        sql(insertString.toLowerCase(Locale.ROOT))
-        sql(insertString.toUpperCase(Locale.ROOT))
+          sql(insertString.toLowerCase(Locale.ROOT))
+          sql(insertString.toUpperCase(Locale.ROOT))
+        }
       }
     }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+      testDefaultColumn
+    }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+      SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
+      testDefaultColumn
+    }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+      SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
+      testDefaultColumn
+    }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to